Skip to content

Commit

Permalink
[CONTROLLER/HTTP] adds organization db api
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengya committed Mar 7, 2024
1 parent 0e80283 commit eb96031
Show file tree
Hide file tree
Showing 12 changed files with 311 additions and 108 deletions.
2 changes: 2 additions & 0 deletions server/controller/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var GConfig *GlobalConfig
const GO_BIRTHDAY = "2006-01-02 15:04:05"
const K8S_CA_CRT_PATH = "/run/secrets/kubernetes.io/serviceaccount/ca.crt"

const DATABASE_PREFIX = "deepflow_"

const (
REMOTE_API_TIMEOUT = 30
INGESTER_API_PORT = 30106
Expand Down
4 changes: 2 additions & 2 deletions server/controller/controller/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func IsMasterController(cfg *config.ControllerConfig) bool {

// migrate db by master region master controller
func migrateMySQL(cfg *config.ControllerConfig) {
ok := migrator.MigrateMySQL(cfg.MySqlCfg)
if !ok {
err := migrator.MigrateMySQL(cfg.MySqlCfg)
if err != nil {
log.Error("migrate mysql failed")
time.Sleep(time.Second)
os.Exit(0)
Expand Down
26 changes: 8 additions & 18 deletions server/controller/db/mysql/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,8 @@ func CreateDatabaseIfNotExists(db *gorm.DB, database string) (bool, error) {
}
}

func DropDatabaseIfInitTablesFailed(db *gorm.DB, database string) bool {
log.Info("drop database if init tables failed")
err := InitTables(db)
if err != nil {
err := DropDatabase(db, database)
if err != nil {
log.Errorf("drop database %s failed: %v", database, err)
}
return false
}
return true
}

func InitTables(db *gorm.DB) error {
log.Info("init db tables start")
func InitEETables(db *gorm.DB) error {
log.Info("init CE tables start")
initSQL, err := ioutil.ReadFile(fmt.Sprintf("%s/init.sql", SQL_FILE_DIR))
if err != nil {
log.Errorf("read sql file failed: %v", err)
Expand All @@ -82,12 +69,15 @@ func InitTables(db *gorm.DB) error {
log.Errorf("init db tables failed: %v", err)
return err
}
err = db.Exec(fmt.Sprintf("INSERT INTO db_version (version) VALUE ('%s')", migration.DB_VERSION_EXPECTED)).Error
log.Info("init CE tables success")
return err
}

func InitDBVersion(db *gorm.DB) error {
err := db.Exec(fmt.Sprintf("INSERT INTO db_version (version) VALUE ('%s')", migration.DB_VERSION_EXPECTED)).Error
if err != nil {
log.Errorf("init db version failed: %v", err)
return err
}
log.Info("init db tables success")
return err
}

Expand Down
17 changes: 9 additions & 8 deletions server/controller/db/mysql/gorm.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var DbConfig MySqlConfig

func InitMySQL(cfg MySqlConfig) error {
DbConfig = cfg
Db = Gorm(cfg)
Db, _ = Gorm(cfg)
if Db == nil {
return errors.New("connect mysql failed")
}
Expand All @@ -54,7 +54,7 @@ func InitMySQL(cfg MySqlConfig) error {
return nil
}

func Gorm(cfg MySqlConfig) *gorm.DB {
func Gorm(cfg MySqlConfig) (*gorm.DB, error) {
dsn := GetDSN(cfg, cfg.Database, cfg.TimeOut, false)
return GetGormDB(dsn)
}
Expand All @@ -63,12 +63,12 @@ func GetResultSetMax() int {
return int(DbConfig.ResultSetMax)
}

func GetConnectionWithoutDatabase(cfg MySqlConfig) *gorm.DB {
func GetConnectionWithoutDatabase(cfg MySqlConfig) (*gorm.DB, error) {
dsn := GetDSN(cfg, "", cfg.TimeOut, false)
return GetGormDB(dsn)
}

func GetConnectionWithDatabase(cfg MySqlConfig) *gorm.DB {
func GetConnectionWithDatabase(cfg MySqlConfig) (*gorm.DB, error) {
// set multiStatements=true in dsn only when migrating MySQL
dsn := GetDSN(cfg, cfg.Database, cfg.TimeOut*2, true)
return GetGormDB(dsn)
Expand All @@ -90,7 +90,7 @@ func GetDSN(cfg MySqlConfig, database string, timeout uint32, multiStatements bo
return dsn
}

func GetGormDB(dsn string) *gorm.DB {
func GetGormDB(dsn string) (*gorm.DB, error) {
Db, err := gorm.Open(mysql.New(mysql.Config{
DSN: dsn, // DSN data source name
DefaultStringSize: 256, // string 类型字段的默认长度
Expand All @@ -110,14 +110,15 @@ func GetGormDB(dsn string) *gorm.DB {
}), // 配置log
})
if err != nil {
log.Errorf("Mysql Connection failed with error: %v", err.Error())
return nil
err = errors.New(fmt.Sprintf("MySQL Connection failed with error: %v", err.Error()))
log.Error(err.Error())
return nil, err
}

sqlDB, _ := Db.DB()
// 限制最大空闲连接数、最大连接数和连接的生命周期
sqlDB.SetMaxIdleConns(50)
sqlDB.SetMaxOpenConns(100)
sqlDB.SetConnMaxLifetime(time.Hour)
return Db
return Db, nil
}
105 changes: 28 additions & 77 deletions server/controller/db/mysql/migrator/migrator.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -17,18 +17,18 @@
package migrator

import (
"errors"
"fmt"

"github.com/op/go-logging"
"gorm.io/gorm"

"github.com/deepflowio/deepflow/server/controller/db/mysql"
. "github.com/deepflowio/deepflow/server/controller/db/mysql/common"
. "github.com/deepflowio/deepflow/server/controller/db/mysql/config"
"github.com/deepflowio/deepflow/server/controller/db/mysql/migration"
"github.com/deepflowio/deepflow/server/controller/db/mysql/common"
"github.com/deepflowio/deepflow/server/controller/db/mysql/config"
"github.com/deepflowio/deepflow/server/controller/db/mysql/migrator/table"
)

var log = logging.MustGetLogger("db.migrator.mysql")
var log = logging.MustGetLogger("db.mysql.migrator")

// if configured database does not exist, it is considered a new deployment, will create database and init tables;
// if configured database exists, but db_version table does not exist, it is also considered a new deployment,
Expand All @@ -38,89 +38,40 @@ var log = logging.MustGetLogger("db.migrator.mysql")
// if configured database exists, and db_version table exists, check whether db_version is the latest version
//
// and upgrade based the result.
func MigrateMySQL(cfg MySqlConfig) bool {
db := mysql.GetConnectionWithoutDatabase(cfg)
if db == nil {
return false
}
databaseExisted, err := CreateDatabaseIfNotExists(db, cfg.Database)
if err != nil {
log.Errorf("database: %s is not ready: %v", cfg.Database, err)
return false
}

db = mysql.GetConnectionWithDatabase(cfg)
if db == nil {
return false
}
if !databaseExisted {
return DropDatabaseIfInitTablesFailed(db, cfg.Database)
} else {
var dbVersionTable string
err = db.Raw(fmt.Sprintf("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s'", cfg.Database, migration.DB_VERSION_TABLE)).Scan(&dbVersionTable).Error
if err != nil {
log.Errorf("check db_version table failed: %v", err)
return false
}
if dbVersionTable == "" {
return InitTablesWithoutRollBack(db, cfg.Database)
} else {
return UpgradeIfDBVersionNotLatest(db, cfg)
}
func MigrateMySQL(cfg config.MySqlConfig) error {
if databaseExisted, err := CreateDatabase(cfg); err != nil {
return err
} else if databaseExisted {
return table.UpgradeDatabase(cfg)
}
return nil
}

func InitTablesWithoutRollBack(db *gorm.DB, database string) bool {
log.Info("init db tables without rollback")
err := InitTables(db)
func CreateDatabase(cfg config.MySqlConfig) (databaseExisted bool, err error) {
db, err := mysql.GetConnectionWithoutDatabase(cfg)
if err != nil {
return false
return
}
return true
}

func UpgradeIfDBVersionNotLatest(db *gorm.DB, cfg MySqlConfig) bool {
log.Info("upgrade if db version is not the latest")
var version string
err := db.Raw(fmt.Sprintf("SELECT version FROM %s", migration.DB_VERSION_TABLE)).Scan(&version).Error
databaseExisted, err = common.CreateDatabaseIfNotExists(db, cfg.Database)
if err != nil {
log.Errorf("check db version failed: %v", err)
return false
err = errors.New(fmt.Sprintf("database: %s is not ready: %v", cfg.Database, err))
log.Error(err.Error())
return
}
log.Infof("current db version: %s, expected db version: %s", version, migration.DB_VERSION_EXPECTED)
if version == "" {
if cfg.DropDatabaseEnabled {
return RecreateDatabaseAndInitTables(db, cfg)
} else {
log.Errorf("current db version is null, need manual handling")
return false
}
} else if version != migration.DB_VERSION_EXPECTED {
err = ExecuteIssus(db, version)
if !databaseExisted {
db, err = mysql.GetConnectionWithDatabase(cfg)
if err != nil {
return false
return
}
return true
err = table.DropDatabaseIfInitTablesFailed(db, cfg.Database)
}
return true
return
}

func RecreateDatabaseAndInitTables(db *gorm.DB, cfg MySqlConfig) bool {
log.Info("recreate database and init tables")
DropDatabase(db, cfg.Database)
db = mysql.GetConnectionWithoutDatabase(cfg)
if db == nil {
return false
}
err := CreateDatabase(db, cfg.Database)
func DropDatabase(cfg config.MySqlConfig) error {
db, err := mysql.GetConnectionWithDatabase(cfg)
if err != nil {
log.Errorf("created database %s failed: %v", cfg.Database, err)
return false
}

db = mysql.GetConnectionWithDatabase(cfg)
if db == nil {
return false
return err
}
return DropDatabaseIfInitTablesFailed(db, cfg.Database)
return common.DropDatabase(db, cfg.Database)
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/deepflowio/deepflow/server/controller/db/mysql/migrator
module github.com/deepflowio/deepflow/server/controller/db/mysql/migrator/table

go 1.18
128 changes: 128 additions & 0 deletions server/controller/db/mysql/migrator/table/table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package table

import (
"errors"
"fmt"

"github.com/op/go-logging"
"gorm.io/gorm"

"github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/db/mysql/common"
"github.com/deepflowio/deepflow/server/controller/db/mysql/config"
"github.com/deepflowio/deepflow/server/controller/db/mysql/migration"
)

var log = logging.MustGetLogger("db.mysql.migrator.table")

func UpgradeDatabase(cfg config.MySqlConfig) error {
db, err := mysql.GetConnectionWithDatabase(cfg)
if err != nil {
return err
}
var dbVersionTable string
err = db.Raw(fmt.Sprintf("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s'", cfg.Database, migration.DB_VERSION_TABLE)).Scan(&dbVersionTable).Error
if err != nil {
err = errors.New(fmt.Sprintf("check db_version table failed: %v", err))
log.Error(err.Error())
return err
}
if dbVersionTable == "" {
return initTablesWithoutRollBack(db, cfg.Database)
} else {
return upgradeIfDBVersionNotLatest(db, cfg)
}
}

func initTablesWithoutRollBack(db *gorm.DB, database string) error {
log.Info("init db tables without rollback")
return initTables(db)
}

func upgradeIfDBVersionNotLatest(db *gorm.DB, cfg config.MySqlConfig) error {
log.Info("upgrade if db version is not the latest")
var version string
err := db.Raw(fmt.Sprintf("SELECT version FROM %s", migration.DB_VERSION_TABLE)).Scan(&version).Error
if err != nil {
err = errors.New(fmt.Sprintf("check db version failed: %v", err))
log.Error(err.Error())
return err
}
log.Infof("current db version: %s, expected db version: %s", version, migration.DB_VERSION_EXPECTED)
if version == "" {
if cfg.DropDatabaseEnabled {
return recreateDatabaseAndInitTables(db, cfg)
} else {
err = errors.New("current db version is null, need manual handling")
log.Error(err.Error())
return err
}
} else if version != migration.DB_VERSION_EXPECTED {
return common.ExecuteIssus(db, version)
}
return nil
}

func recreateDatabaseAndInitTables(db *gorm.DB, cfg config.MySqlConfig) error {
log.Info("recreate database and init tables")
common.DropDatabase(db, cfg.Database)
db, err := mysql.GetConnectionWithoutDatabase(cfg)
if err != nil {
return err
}
err = common.CreateDatabase(db, cfg.Database)
if err != nil {
err = errors.New(fmt.Sprintf("created database %s failed: %v", cfg.Database, err))
log.Error(err.Error())
return err
}

db, err = mysql.GetConnectionWithDatabase(cfg)
if err != nil {
return err
}
return DropDatabaseIfInitTablesFailed(db, cfg.Database)
}


func DropDatabaseIfInitTablesFailed(db *gorm.DB, database string) error {
log.Info("drop database if init tables failed")
err := initTables(db)
if err != nil {
err := common.DropDatabase(db, database)
if err != nil {
err = errors.New(fmt.Sprintf("drop database %s failed: %v", database, err))
log.Error(err.Error())
}
return err
}
return nil
}

func initTables(db *gorm.DB) error {
log.Info("init db tables start")
if err := common.InitEETables(db); err != nil {
return err
}
if err := common.InitDBVersion(db); err != nil {
return err
}
log.Info("init db tables success")
return nil
}
Loading

0 comments on commit eb96031

Please sign in to comment.