Skip to content

Commit

Permalink
support split table for internal storage
Browse files Browse the repository at this point in the history
Signed-off-by: calvin <wen.chen@daocloud.io>
  • Loading branch information
calvin committed Mar 25, 2024
1 parent 29c0306 commit 723c0d2
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 91 deletions.
30 changes: 5 additions & 25 deletions pkg/storage/internalstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"gopkg.in/natefinch/lumberjack.v2"
"gorm.io/gorm/logger"
"k8s.io/klog/v2"

clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2"
)

const (
Expand All @@ -29,9 +27,9 @@ const (
type DivisionPolicy string

const (
DivisionPolicyNone DivisionPolicy = "None"
DivisionPolicyGroupResource DivisionPolicy = "GroupResource"
DivisionPolicyCustom DivisionPolicy = "Custom"
DivisionPolicyNone DivisionPolicy = "None"
DivisionPolicyGroupVersionResource DivisionPolicy = "GVR"
DivisionPolicyCustom DivisionPolicy = "Custom"
)

type Config struct {
Expand All @@ -58,30 +56,12 @@ type Config struct {

Params map[string]string `yaml:"params"`

AutoMigration *bool `yaml:"autoMigration"` // If set to false, no tables will be created
DivisionPolicy DivisionPolicy `yaml:"divisionPolicy"`
Mapper []ResourceMapper `yaml:"mapper"` // Only DivisionPolicy is DivisionPolicyCustom it need to specify the mapping between resource and table
SkipAutoMigration bool `yaml:"skipAutoMigration"` // If set to false, no tables will be created
DivisionPolicy DivisionPolicy `yaml:"divisionPolicy"`

Log *LogConfig `yaml:"log"`
}

type ResourceMapper struct {
Table *Table `yaml:"table"`
Resources []clusterv1alpha2.ClusterGroupResources `yaml:"resources"`
}

type Table struct {
Name string `yaml:"name"`
ExtraFields []ExtraField `yaml:"extraFields"`
}

type ExtraField struct {
Name string `yaml:"name"`
PlainPath string `yaml:"plainPath"`
Type string `yaml:"type"`
Index string `yaml:"index"`
}

type LogConfig struct {
Stdout bool `yaml:"stdout"`
Level string `yaml:"level"`
Expand Down
14 changes: 13 additions & 1 deletion pkg/storage/internalstorage/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,19 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) {
sqlDB.SetMaxOpenConns(connPool.MaxOpenConns)
sqlDB.SetConnMaxLifetime(connPool.ConnMaxLifetime)

return &StorageFactory{db}, nil
if !cfg.SkipAutoMigration && (cfg.DivisionPolicy == DivisionPolicyNone || cfg.DivisionPolicy == "") {
if exist := db.Migrator().HasTable("resources"); !exist {
if err := db.AutoMigrate(&Resource{}); err != nil {
return nil, err
}
}
}

return &StorageFactory{
db: db,
SkipAutoMigration: cfg.SkipAutoMigration,
DivisionPolicy: cfg.DivisionPolicy,
}, nil
}

func newLogger(cfg *Config) (logger.Interface, error) {
Expand Down
50 changes: 27 additions & 23 deletions pkg/storage/internalstorage/resource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import (
)

type ResourceStorage struct {
db *gorm.DB
codec runtime.Codec

db *gorm.DB
codec runtime.Codec
storageGroupResource schema.GroupResource
storageVersion schema.GroupVersion
memoryVersion schema.GroupVersion
Expand Down Expand Up @@ -116,14 +115,17 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true}
}

result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": metaobj.GetNamespace(),
"name": metaobj.GetName(),
}).Updates(updatedResource)
result := s.db.WithContext(ctx).
Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": metaobj.GetNamespace(),
"name": metaobj.GetName(),
}).
Updates(updatedResource)

return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
}

Expand All @@ -143,8 +145,8 @@ func (s *ResourceStorage) ConvertDeletedObject(obj interface{}) (runtime.Object,
return &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}}, nil
}

func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB {
return s.db.Model(&Resource{}).Where(map[string]interface{}{
func (s *ResourceStorage) deleteObject(ctx context.Context, cluster, namespace, name string) *gorm.DB {
return s.db.WithContext(ctx).Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
Expand All @@ -160,21 +162,23 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim
return err
}

if result := s.deleteObject(cluster, metaobj.GetNamespace(), metaobj.GetName()); result.Error != nil {
if result := s.deleteObject(ctx, cluster, metaobj.GetNamespace(), metaobj.GetName()); result.Error != nil {
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
}
return nil
}

func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
})
return s.db.WithContext(ctx).
Select("object").
Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
})
}

func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
Expand All @@ -199,7 +203,7 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
result = &ResourceMetadataList{}
}

query := s.db.WithContext(ctx).Model(&Resource{})
query := s.db.WithContext(ctx)
query = query.Where(map[string]interface{}{
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/internalstorage/resource_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func TestResourceStorage_deleteObject(t *testing.T) {
postgreSQL := postgresDB.Session(&gorm.Session{SkipDefaultTransaction: true}).ToSQL(
func(tx *gorm.DB) *gorm.DB {
rs := newTestResourceStorage(tx, test.resource)
return rs.deleteObject(test.cluster, test.namespace, test.resourceName)
return rs.deleteObject(context.TODO(), test.cluster, test.namespace, test.resourceName)
})

if postgreSQL != test.expected.postgres {
Expand All @@ -354,7 +354,7 @@ func TestResourceStorage_deleteObject(t *testing.T) {
mysqlSQL := mysqlDBs[version].Session(&gorm.Session{SkipDefaultTransaction: true}).ToSQL(
func(tx *gorm.DB) *gorm.DB {
rs := newTestResourceStorage(tx, test.resource)
return rs.deleteObject(test.cluster, test.namespace, test.resourceName)
return rs.deleteObject(context.TODO(), test.cluster, test.namespace, test.resourceName)
})

if mysqlSQL != test.expected.mysql {
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestResourceStorage_Update(t *testing.T) {

func newTestResourceStorage(db *gorm.DB, storageGVK schema.GroupVersionResource) *ResourceStorage {
return &ResourceStorage{
db: db,
db: db.Table("resources").Model(&Resource{}),
storageGroupResource: storageGVK.GroupResource(),
storageVersion: storageGVK.GroupVersion(),
}
Expand Down
137 changes: 99 additions & 38 deletions pkg/storage/internalstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internalstorage
import (
"context"
"fmt"
"strings"

"gorm.io/gorm"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -12,30 +13,12 @@ import (
)

type StorageFactory struct {
db *gorm.DB
AutoMigration *bool
DivisionPolicy DivisionPolicy
Mapper []ResourceMapper
db *gorm.DB
SkipAutoMigration bool
DivisionPolicy DivisionPolicy
}

func (s *StorageFactory) AutoMigrate() error {
if s.AutoMigration != nil && *s.AutoMigration {
switch s.DivisionPolicy {
if err := s.db.AutoMigrate(&Resource{}); err != nil {
return err
}
case "", DivisionPolicyNone:
case DivisionPolicyGroupResource:

}

if s.DivisionPolicy == "" || s.DivisionPolicy == DivisionPolicyNone {
if err := s.db.AutoMigrate(&Resource{}); err != nil {
return err
}
}
}

return nil
}

Expand All @@ -44,10 +27,37 @@ func (s *StorageFactory) GetSupportedRequestVerbs() []string {
}

func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) {
return &ResourceStorage{
db: s.db,
codec: config.Codec,
gvr := schema.GroupVersionResource{
Group: config.StorageGroupResource.Group,
Version: config.StorageVersion.Version,
Resource: config.StorageGroupResource.Resource,
}
table := s.tableName(gvr)

var model interface{}
switch s.DivisionPolicy {
case DivisionPolicyGroupVersionResource:
model = &GroupVersionResource{}
if !s.SkipAutoMigration {
if exist := s.db.Migrator().HasTable(table); !exist {
if err := s.db.AutoMigrate(&GroupVersionResource{}); err != nil {
return nil, err
}

if err := s.db.Migrator().RenameTable("group_version_resources", table); err != nil {
if !s.db.Migrator().HasTable(table) {
return nil, err
}
}
}
}
default:
model = &Resource{}
}

return &ResourceStorage{
db: s.db.Table(table).Model(model),
codec: config.Codec,
storageGroupResource: config.StorageGroupResource,
storageVersion: config.StorageVersion,
memoryVersion: config.MemoryVersion,
Expand All @@ -64,12 +74,21 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes
}

func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
tables, err := s.db.Migrator().GetTables()
if err != nil {
return nil, err
}

var resources []Resource
result := s.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version").
Where(map[string]interface{}{"cluster": cluster}).
Find(&resources)
if result.Error != nil {
return nil, InterpretDBError(cluster, result.Error)
for _, table := range tables {
result := s.db.WithContext(ctx).
Table(table).
Select("group", "version", "resource", "namespace", "name", "resource_version").
Where(map[string]interface{}{"cluster": cluster}).
Find(&resources)
if result.Error != nil {
return nil, InterpretDBError(cluster, result.Error)
}
}

resourceversions := make(map[schema.GroupVersionResource]map[string]interface{})
Expand All @@ -91,18 +110,41 @@ func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string
}

func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error {
result := s.db.WithContext(ctx).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
return InterpretDBError(cluster, result.Error)
tables, err := s.db.Migrator().GetTables()
if err != nil {
return err
}

for _, table := range tables {
result := s.db.WithContext(ctx).Table(table).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
if result.Error != nil {
return InterpretDBError(cluster, result.Error)
}
}

return nil
}

func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error {
result := s.db.WithContext(ctx).Where(map[string]interface{}{
"cluster": cluster,
"group": gvr.Group,
"version": gvr.Version,
"resource": gvr.Resource,
}).Delete(&Resource{})
return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), result.Error)
err := s.db.Transaction(func(db *gorm.DB) error {
result := s.db.WithContext(ctx).
Table(s.tableName(gvr)).
Where(map[string]interface{}{
"cluster": cluster,
"group": gvr.Group,
"version": gvr.Version,
"resource": gvr.Resource,
}).
Delete(&Resource{})

if result.Error != nil {
return result.Error
}

return nil
})

return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), err)
}

func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*internal.CollectionResource, error) {
Expand All @@ -116,3 +158,22 @@ func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*interna
func (s *StorageFactory) PrepareCluster(cluster string) error {
return nil
}

// GenerateTableFor return table name using gvr string
func GenerateTableFor(gvr schema.GroupVersionResource) string {
if gvr.Group == "" {
return fmt.Sprintf("%s_%s", gvr.Version, gvr.Resource)
}

group := strings.ReplaceAll(gvr.Group, ".", "_")
return fmt.Sprintf("%s_%s_%s", group, gvr.Version, gvr.Resource)
}

func (s *StorageFactory) tableName(gvr schema.GroupVersionResource) string {
table := "resources"
if s.DivisionPolicy == DivisionPolicyGroupVersionResource {
table = GenerateTableFor(gvr)
}

return table
}
Loading

0 comments on commit 723c0d2

Please sign in to comment.