Skip to content

Commit

Permalink
support split table for internal storage
Browse files Browse the repository at this point in the history
Signed-off-by: calvin <wen.chen@daocloud.io>
  • Loading branch information
calvin committed Jan 31, 2024
1 parent 89b8466 commit 27a0499
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 53 deletions.
6 changes: 3 additions & 3 deletions examples/pediacluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ metadata:
name: cluster-example
spec:
apiserver: "https://10.30.43.43:6443"
caData:
caData:
tokenData:
certData:
keyData:
certData:
keyData:
syncResources:
- group: apps
resources:
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/internalstorage/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) {
sqlDB.SetMaxOpenConns(connPool.MaxOpenConns)
sqlDB.SetConnMaxLifetime(connPool.ConnMaxLifetime)

return &StorageFactory{db}, nil
return &StorageFactory{
db: db,
AutoMigration: cfg.AutoMigration,
DivisionPolicy: cfg.DivisionPolicy,
Mapper: cfg.Mapper,
}, nil
}

func newLogger(cfg *Config) (logger.Interface, error) {
Expand Down
62 changes: 36 additions & 26 deletions pkg/storage/internalstorage/resource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
type ResourceStorage struct {
db *gorm.DB
codec runtime.Codec
table string

storageGroupResource schema.GroupResource
storageVersion schema.GroupVersion
Expand Down Expand Up @@ -83,7 +84,7 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
resource.DeletedAt = sql.NullTime{Time: deletedAt.Time, Valid: true}
}

result := s.db.WithContext(ctx).Create(&resource)
result := s.db.WithContext(ctx).Table(s.table).Create(&resource)
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
}

Expand Down Expand Up @@ -116,14 +117,19 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true}
}

result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": metaobj.GetNamespace(),
"name": metaobj.GetName(),
}).Updates(updatedResource)
result := s.db.WithContext(ctx).
Model(&Resource{}).
Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": metaobj.GetNamespace(),
"name": metaobj.GetName(),
}).
Table(s.table).
Updates(updatedResource)

return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
}

Expand All @@ -144,14 +150,17 @@ func (c *ResourceStorage) ConvertDeletedObject(obj interface{}) (runtime.Object,
}

func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB {
return s.db.Model(&Resource{}).Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
}).Delete(&Resource{})
return s.db.Model(&Resource{}).
Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
}).
Table(s.table).
Delete(&Resource{})
}

func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtime.Object) error {
Expand All @@ -167,14 +176,15 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim
}

func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
})
return s.db.WithContext(ctx).Model(&Resource{}).Table(s.table).Select("object").
Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
})
}

func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
Expand All @@ -199,7 +209,7 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
result = &ResourceMetadataList{}
}

query := s.db.WithContext(ctx).Model(&Resource{})
query := s.db.WithContext(ctx).Model(&Resource{}).Table(s.table)
query = query.Where(map[string]interface{}{
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
Expand Down
105 changes: 82 additions & 23 deletions pkg/storage/internalstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package internalstorage
import (
"context"
"fmt"
"strings"
"sync"

"gorm.io/gorm"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -11,6 +13,8 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
)

var mutex sync.Mutex

type StorageFactory struct {
db *gorm.DB
AutoMigration *bool
Expand All @@ -19,34 +23,54 @@ type StorageFactory struct {
}

func (s *StorageFactory) AutoMigrate() error {
return nil
}

func (s *StorageFactory) GetSupportedRequestVerbs() []string {
return []string{"get", "list"}
}

func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) {
mutex.Lock()
defer mutex.Unlock()

var table string
if s.AutoMigration != nil && *s.AutoMigration {
switch s.DivisionPolicy {
if err := s.db.AutoMigrate(&Resource{}); err != nil {
return err
}
case "", DivisionPolicyNone:
table = "resources"

if exist := s.db.Migrator().HasTable(table); !exist {
if err := s.db.AutoMigrate(&Resource{}); err != nil {
return nil, err
}
}
case DivisionPolicyGroupResource:
gvr := schema.GroupVersionResource{
Group: config.StorageGroupResource.Group,
Version: config.StorageVersion.Version,
Resource: config.StorageGroupResource.Resource,
}

}
table = GenerateTableFor(gvr)

if exist := s.db.Migrator().HasTable(table); !exist {
if err := s.db.AutoMigrate(&Resource{}); err != nil {
return nil, err
}

if s.DivisionPolicy == "" || s.DivisionPolicy == DivisionPolicyNone {
if err := s.db.AutoMigrate(&Resource{}); err != nil {
return err
err := s.db.Migrator().RenameTable("resources", table)
if err != nil {
return nil, err
}
}
}
}

return nil
}

func (s *StorageFactory) GetSupportedRequestVerbs() []string {
return []string{"get", "list"}
}

func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) {
return &ResourceStorage{
db: s.db,
codec: config.Codec,
table: table,

storageGroupResource: config.StorageGroupResource,
storageVersion: config.StorageVersion,
Expand All @@ -65,11 +89,23 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes

func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
var resources []Resource
result := f.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version").
Where(map[string]interface{}{"cluster": cluster}).
Find(&resources)
if result.Error != nil {
return nil, InterpretDBError(cluster, result.Error)
mutex.Lock()
tables, err := f.db.Migrator().GetTables()
if err != nil {
mutex.Unlock()
return nil, err
}
mutex.Unlock()
for _, table := range tables {
var tableResources []Resource
result := f.db.WithContext(ctx).Table(table).Select("group", "version", "resource", "namespace", "name", "resource_version").
Where(map[string]interface{}{"cluster": cluster}).
Find(&tableResources)
if result.Error != nil {
return nil, InterpretDBError(cluster, result.Error)
}

resources = append(resources, tableResources...)
}

resourceversions := make(map[schema.GroupVersionResource]map[string]interface{})
Expand All @@ -91,12 +127,25 @@ func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string
}

func (f *StorageFactory) CleanCluster(ctx context.Context, cluster string) error {
result := f.db.WithContext(ctx).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
return InterpretDBError(cluster, result.Error)
mutex.Lock()
tables, err := f.db.Migrator().GetTables()
if err != nil {
mutex.Unlock()
return err
}
mutex.Unlock()

for _, table := range tables {
result := f.db.WithContext(ctx).Table(table).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
if result.Error != nil {
return InterpretDBError(cluster, result.Error)
}
}
return nil
}

func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error {
result := s.db.WithContext(ctx).Where(map[string]interface{}{
result := s.db.WithContext(ctx).Table(GenerateTableFor(gvr)).Where(map[string]interface{}{
"cluster": cluster,
"group": gvr.Group,
"version": gvr.Version,
Expand All @@ -116,3 +165,13 @@ func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*interna
func (s *StorageFactory) PrepareCluster(cluster string) error {
return nil
}

// GenerateTableFor return table name using gvr string
func GenerateTableFor(gvr schema.GroupVersionResource) string {
if gvr.Group == "" {
return fmt.Sprintf("%s_%s", gvr.Version, gvr.Resource)
}

group := strings.ReplaceAll(gvr.Group, ".", "_")
return fmt.Sprintf("%s_%s_%s", group, gvr.Version, gvr.Resource)
}

0 comments on commit 27a0499

Please sign in to comment.