From caef7af5fa2945eb46eb5c0f085056cc2ca11306 Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Tue, 8 Nov 2022 17:50:05 +0800 Subject: [PATCH] refactor: refactor cluster package --- server/cluster/cluster.go | 1032 +++-------------- server/cluster/data/table_manager.go | 28 - server/cluster/data/topology_manager.go | 28 - server/cluster/error.go | 2 +- server/cluster/manager.go | 107 +- server/cluster/node.go | 8 +- server/cluster/schema.go | 28 - server/cluster/shard.go | 53 - server/cluster/table.go | 37 - server/cluster/table_manager.go | 287 +++++ ...{manager_test.go => table_manager_test.go} | 69 +- server/cluster/topology_manager.go | 427 +++++++ server/cluster/types.go | 66 +- server/coordinator/eventdispatch/dispatch.go | 20 +- .../eventdispatch/dispatch_impl.go | 14 +- server/coordinator/procedure/common_test.go | 8 +- .../procedure/create_drop_table_test.go | 13 +- server/coordinator/procedure/create_table.go | 66 +- server/coordinator/procedure/drop_table.go | 60 +- server/coordinator/procedure/factory.go | 8 +- server/coordinator/procedure/scatter.go | 57 +- server/coordinator/procedure/scatter_test.go | 66 +- server/coordinator/scheduler.go | 30 +- server/server.go | 8 +- server/service/grpc/service.go | 69 +- server/storage/meta.go | 2 +- server/storage/storage_impl.go | 4 +- server/storage/storage_test.go | 2 +- server/storage/types.go | 8 +- 29 files changed, 1232 insertions(+), 1375 deletions(-) delete mode 100644 server/cluster/data/table_manager.go delete mode 100644 server/cluster/data/topology_manager.go delete mode 100644 server/cluster/schema.go delete mode 100644 server/cluster/shard.go create mode 100644 server/cluster/table_manager.go rename server/cluster/{manager_test.go => table_manager_test.go} (76%) create mode 100644 server/cluster/topology_manager.go diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 6944bdde..51e7bdd1 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -4,755 +4,186 @@ package cluster import ( "context" - "fmt" - "math/rand" "path" "sync" - "time" - "github.com/CeresDB/ceresdbproto/pkg/clusterpb" - "github.com/CeresDB/ceresdbproto/pkg/metaservicepb" + "github.com/CeresDB/ceresmeta/pkg/log" "github.com/CeresDB/ceresmeta/server/id" "github.com/CeresDB/ceresmeta/server/storage" "github.com/pkg/errors" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" ) -type metaData struct { - cluster storage.Cluster - clusterView *clusterpb.ClusterTopology -} - type Cluster struct { clusterID storage.ClusterID // RWMutex is used to protect following fields. // TODO: Encapsulated maps as a specific struct. lock sync.RWMutex - metaData *metaData - // The two fields describes the whole topology of the cluster. - // TODO: merge `shardsCache` & `nodeShardsCache` into the whole topology. - shardsCache map[uint32]*Shard // shardID -> shard - nodeShardsCache map[string]*ShardsOfNode // nodeName -> shards of the node + metaData storage.Cluster - // Manage tables by schema. - schemasCache map[string]*Schema // schemaName -> schema + tableManager TableManager + topologyManager TopologyManager // Manage the registered nodes from heartbeat. - registeredNodesCache map[string]*RegisteredNode // nodeName -> node - - storage storage.Storage - kv clientv3.KV - schemaIDAlloc id.Allocator - tableIDAlloc id.Allocator - shardIDAlloc id.Allocator -} - -// FIXME: For now, not all the returned values by cluster methods are deep-copied, which may lead to data race, let's do deep copy for returned values. - -func (c *Cluster) GetClusterShardView() ([]*clusterpb.Shard, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - // FIXME: Should use the shardsCache & nodeShardsCache to build the view. - shardView := c.metaData.clusterView.ShardView - newShardView := make([]*clusterpb.Shard, 0, len(shardView)) - // TODO: We need to use the general deep copy tool method to replace. - for _, shard := range shardView { - copyShard := &clusterpb.Shard{ - Id: shard.Id, - ShardRole: shard.ShardRole, - Node: shard.Node, - } - newShardView = append(newShardView, copyShard) - } - return newShardView, nil -} + registeredNodesCache map[string]RegisteredNode // nodeName -> node -func (c *Cluster) GetClusterID() storage.ClusterID { - return c.clusterID + storage storage.Storage + kv clientv3.KV + shardIDAlloc id.Allocator } func NewCluster(meta storage.Cluster, storage storage.Storage, kv clientv3.KV, rootPath string, idAllocatorStep uint) *Cluster { + schemaIDAlloc := id.NewAllocatorImpl(kv, path.Join(rootPath, meta.Name, AllocSchemaIDPrefix), idAllocatorStep) + tableIDAlloc := id.NewAllocatorImpl(kv, path.Join(rootPath, meta.Name, AllocTableIDPrefix), idAllocatorStep) + // FIXME: Load ShardTopology when cluster create, pass exist ShardID to allocator. + shardIDAlloc := id.NewReusableAllocatorImpl([]uint64{}, MinShardID) + cluster := &Cluster{ clusterID: meta.ID, - metaData: &metaData{cluster: meta}, - shardsCache: map[uint32]*Shard{}, - nodeShardsCache: map[string]*ShardsOfNode{}, - schemasCache: map[string]*Schema{}, - registeredNodesCache: map[string]*RegisteredNode{}, - schemaIDAlloc: id.NewAllocatorImpl(kv, path.Join(rootPath, meta.Name, AllocSchemaIDPrefix), idAllocatorStep), - tableIDAlloc: id.NewAllocatorImpl(kv, path.Join(rootPath, meta.Name, AllocTableIDPrefix), idAllocatorStep), - // TODO: Load ShardTopology when cluster create, pass exist shardID to allocator. - shardIDAlloc: id.NewReusableAllocatorImpl([]uint64{}, MinShardID), - - storage: storage, - kv: kv, + metaData: meta, + tableManager: NewTableManagerImpl(storage, meta.ID, schemaIDAlloc, tableIDAlloc), + topologyManager: NewTopologyManagerImpl(storage, meta.ID, shardIDAlloc), + registeredNodesCache: map[string]RegisteredNode{}, + storage: storage, + kv: kv, + shardIDAlloc: shardIDAlloc, } return cluster } -func (c *Cluster) Name() string { - return c.metaData.cluster.Name +func (c *Cluster) GetClusterID() storage.ClusterID { + return c.clusterID } -// Initialize the cluster view and shard view of the cluster. -// It will be used when we create the cluster. -func (c *Cluster) init(ctx context.Context) error { - clusterView := storage.ClusterView{ - ClusterID: c.clusterID, - Version: 0, - State: storage.Empty, - ShardNodes: nil, - CreatedAt: uint64(time.Now().UnixMilli()), - } - - err := c.storage.CreateClusterView(ctx, storage.CreateClusterViewRequest{ClusterView: clusterView}) - if err != nil { - return errors.WithMessage(err, "create cluster view") - } - - clusterTopology := storage.ConvertClusterViewToPB(clusterView) - - c.metaData.clusterView = &clusterTopology - return nil +func (c *Cluster) Name() string { + return c.metaData.Name } -// Load data from storage to memory. -func (c *Cluster) Load(ctx context.Context) error { - c.lock.Lock() - defer c.lock.Unlock() +func (c *Cluster) GetTables(shardIDs []storage.ShardID, nodeName string) map[storage.ShardID]ShardTables { + shardTableIDs := c.topologyManager.GetTableIDs(shardIDs, nodeName) - shards, shardIDs, err := c.loadClusterViewLocked(ctx) - if err != nil { - return errors.WithMessage(err, "load cluster topology") - } - - shardTopologies, err := c.loadShardViewLocked(ctx, shardIDs) - if err != nil { - return errors.WithMessage(err, "load shard topology") - } - - schemas, err := c.loadSchemaLocked(ctx) - if err != nil { - return errors.WithMessage(err, "load schema") - } - - nodes, err := c.loadNodeLocked(ctx) - if err != nil { - return errors.WithMessage(err, "load node") - } + result := make(map[storage.ShardID]ShardTables, len(shardIDs)) - tables, err := c.loadTableLocked(ctx, schemas) - if err != nil { - return errors.WithMessage(err, "load table") - } - - if err := c.loadCacheLocked(shards, shardTopologies, schemas, nodes, tables); err != nil { - return errors.WithMessage(err, "load cache") - } - return nil -} - -func (c *Cluster) loadCacheLocked( - shards map[uint32][]*clusterpb.Shard, - shardTopologies map[uint32]*clusterpb.ShardTopology, - schemasLoaded map[string]*clusterpb.Schema, - nodesLoaded map[string]*clusterpb.Node, - tablesLoaded map[string]map[uint64]*clusterpb.Table, -) error { - // Load schemaCache: load all the schemas. - for _, schema := range schemasLoaded { - c.updateSchemaCacheLocked(schema) + schemas := c.tableManager.GetSchemas() + schemaByID := make(map[storage.SchemaID]storage.Schema) + for _, schema := range schemas { + schemaByID[schema.ID] = schema } - // Load schemasCache: load the tables. - for schemaName, tables := range tablesLoaded { + for shardID, shardTableID := range shardTableIDs { + tables := c.tableManager.GetTablesByIDs(shardTableID.TableIDs) + tableInfos := make([]TableInfo, 0, len(tables)) for _, table := range tables { - _, ok := c.schemasCache[schemaName] - if ok { - c.schemasCache[schemaName].tableMap[table.GetName()] = &Table{ - schema: schemasLoaded[schemaName], - meta: table, - } - } else { - c.schemasCache[schemaName] = &Schema{ - meta: schemasLoaded[schemaName], - tableMap: map[string]*Table{table.GetName(): { - schema: schemasLoaded[schemaName], - meta: table, - }}, - } - } - } - } - - // Update nodeShardsCache. - for shardID, shardPBs := range shards { - for _, shard := range shardPBs { - shardsOfNode, ok := c.nodeShardsCache[shard.GetNode()] + schema, ok := schemaByID[table.SchemaID] if !ok { - shardsOfNode = &ShardsOfNode{ - Endpoint: shard.GetNode(), - ShardIDs: []uint32{}, - } - c.nodeShardsCache[shard.GetNode()] = shardsOfNode - } - shardsOfNode.ShardIDs = append(shardsOfNode.ShardIDs, shardID) - } - } - - // Load registeredNodeCache. - for _, node := range nodesLoaded { - registerNode := NewRegisteredNode(node, []*ShardInfo{}) - c.registeredNodesCache[node.Name] = registerNode - } - - // Load shardsCache. - for shardID, shardTopology := range shardTopologies { - tables := make(map[uint64]*Table, len(shardTopology.TableIds)) - - for _, tableID := range shardTopology.TableIds { - for schemaName, tableMap := range tablesLoaded { - table, ok := tableMap[tableID] - if ok { - tables[tableID] = &Table{ - schema: schemasLoaded[schemaName], - meta: table, - } - } - } - } - // TODO: assert shardID - // TODO: check shard not found by shardID - shardMetaList := shards[shardID] - var nodeMetas []*clusterpb.Node - for _, shardMeta := range shardMetaList { - if node := c.registeredNodesCache[shardMeta.Node]; node != nil { - nodeMetas = append(nodeMetas, node.meta) + log.Warn("schema not exits", zap.Uint64("schemaID", uint64(table.SchemaID))) } + tableInfos = append(tableInfos, TableInfo{ + ID: table.ID, + Name: table.Name, + SchemaID: table.SchemaID, + SchemaName: schema.Name, + }) } - c.shardsCache[shardID] = &Shard{ - meta: shards[shardID], - nodes: nodeMetas, - tables: tables, - version: shardTopology.Version, + result[shardID] = ShardTables{ + Shard: ShardInfo{ + ID: shardTableID.ShardNode.ID, + Role: shardTableID.ShardNode.ShardRole, + Version: shardTableID.Version, + }, + Tables: tableInfos, } } - - return nil + return result } -func (c *Cluster) updateSchemaCacheLocked(schemaPB *clusterpb.Schema) *Schema { - schema := &Schema{meta: schemaPB, tableMap: make(map[string]*Table, 0)} - c.schemasCache[schemaPB.GetName()] = schema - return schema -} - -func (c *Cluster) updateTableCacheLocked(shardID uint32, schema *Schema, tablePB *clusterpb.Table) *Table { - table := &Table{meta: tablePB, schema: schema.meta, shardID: shardID} - schema.tableMap[tablePB.GetName()] = table - c.shardsCache[tablePB.GetShardId()].tables[table.GetID()] = table - return table -} - -func (c *Cluster) updateShardVersionLocked(shardID uint32, prevVersion, newVersion uint64) (*ShardVersionUpdate, error) { - shard, ok := c.shardsCache[shardID] - if !ok { - return nil, ErrShardNotFound - } - - if shard.version != prevVersion { - panic(fmt.Sprintf("shardId:%d, storage version:%d, memory version:%d", shardID, prevVersion, shard.version)) - } - - shard.version = newVersion - return &ShardVersionUpdate{ - ShardID: shardID, - CurrVersion: newVersion, - PrevVersion: prevVersion, - }, nil -} - -func (c *Cluster) getShardViewFromStorage(ctx context.Context, shardID uint32) (*clusterpb.ShardTopology, error) { - shardViewsResult, err := c.storage.ListShardViews(ctx, storage.ListShardViewsRequest{ - ClusterID: c.clusterID, - ShardIDs: []storage.ShardID{storage.ShardID(shardID)}, - }) +func (c *Cluster) DropTable(ctx context.Context, schemaName, tableName string) (DropTableResult, error) { + table, ok, err := c.tableManager.GetTable(schemaName, tableName) if err != nil { - return nil, errors.WithMessage(err, "get shard view from storage") - } - if len(shardViewsResult.ShardViews) != 1 { - return nil, ErrGetShardView.WithCausef("shard has more than one shard view, shardID:%d, shardViewsResult:%v", - shardID, shardViewsResult) + return DropTableResult{}, errors.WithMessage(err, "get table") } - shardView := storage.ConvertShardViewToPB(shardViewsResult.ShardViews[0]) - return &shardView, nil -} -func (c *Cluster) createTableOnShardLocked(ctx context.Context, shardID uint32, schema *Schema, tablePB *clusterpb.Table) (*CreateTableResult, error) { - // Update shardView in storage. - shardView, err := c.getShardViewFromStorage(ctx, shardID) - if err != nil { - return nil, err - } - shardView.TableIds = append(shardView.TableIds, tablePB.GetId()) - prevVersion := shardView.Version - shardView.Version = prevVersion + 1 - if err = c.storage.UpdateShardView(ctx, storage.PutShardViewRequest{ - ClusterID: c.clusterID, - ShardView: storage.ConvertShardViewPB(shardView), - LatestVersion: prevVersion, - }); err != nil { - return nil, errors.WithMessage(err, "put shard view") + if !ok { + return DropTableResult{}, ErrTableNotFound } - // Update tableCache in memory. - table, shardVersion, err := c.createTableInCache(shardID, schema, tablePB, prevVersion, shardView.Version) + // Drop table. + err = c.tableManager.DropTable(ctx, schemaName, tableName) if err != nil { - return nil, errors.WithMessage(err, "create table in cache") + return DropTableResult{}, errors.WithMessagef(err, "table manager drop table") } - return &CreateTableResult{ - Table: table, - ShardVersionUpdate: shardVersion, - }, nil -} - -func (c *Cluster) createTableInCache(shardID uint32, schema *Schema, tablePB *clusterpb.Table, prevVersion, newVersion uint64) (*Table, *ShardVersionUpdate, error) { - table := c.updateTableCacheLocked(shardID, schema, tablePB) - shardVersion, err := c.updateShardVersionLocked(shardID, prevVersion, newVersion) + // Remove dropped table in shard view. + updateVersion, err := c.topologyManager.RemoveTable(ctx, table.ID) if err != nil { - return nil, nil, errors.WithMessage(err, "update shard view version") + return DropTableResult{}, errors.WithMessagef(err, "topology manager drop table") } - return table, shardVersion, nil -} -func (c *Cluster) dropTableUpdateCacheLocked(ctx context.Context, shardID uint32, schema *Schema, table *Table) (*DropTableResult, error) { - // Update shardView in storage. - shardView, err := c.getShardViewFromStorage(ctx, shardID) - if err != nil { - return nil, err - } - - // Remove table in shardView. - found := false - for i, id := range shardView.TableIds { - if id == table.GetID() { - found = true - shardView.TableIds = append(shardView.TableIds[:i], shardView.TableIds[i+1:]...) - break - } - } - - if !found { - panic(fmt.Sprintf("shard view dose not contain table, schema:%s, shard view:%v, table id:%d", schema.GetName(), shardView, table.GetID())) - } - - prevVersion := shardView.Version - shardView.Version = prevVersion + 1 - if err = c.storage.UpdateShardView(ctx, storage.PutShardViewRequest{ - ClusterID: c.clusterID, - ShardView: storage.ConvertShardViewPB(shardView), - LatestVersion: prevVersion, - }); err != nil { - return nil, errors.WithMessage(err, "put shard view") - } - - // Update tableCache in memory. - shardVersion, err := c.dropTableInCache(shardID, schema, table, prevVersion, shardView.Version) - if err != nil { - return nil, errors.WithMessage(err, "drop table in cache") - } - - return &DropTableResult{ - ShardVersionUpdate: shardVersion, + return DropTableResult{ + ShardVersionUpdate: updateVersion, }, nil } -func (c *Cluster) dropTableInCache(shardID uint32, schema *Schema, table *Table, prevVersion, newVersion uint64) (*ShardVersionUpdate, error) { - // Drop table in schemaCache. - schema.dropTableLocked(table.GetName()) - // Drop table in shardCache. - shard, err := c.getShardByIDLocked(shardID) - if err != nil { - return nil, errors.WithMessage(err, "update shard view version") - } - shard.dropTableLocked(table.GetID()) - // Update shard view version. - shardVersion, err := c.updateShardVersionLocked(shardID, prevVersion, newVersion) - if err != nil { - return nil, errors.WithMessage(err, "update shard view version") - } - - return shardVersion, nil -} - -func (c *Cluster) getTableShardIDLocked(tableID uint64) (uint32, error) { - for id, shard := range c.shardsCache { - if _, ok := shard.tables[tableID]; ok { - return id, nil - } - } - return 0, ErrShardNotFound.WithCausef("get table shardID, tableID:%d", tableID) -} - -func (c *Cluster) GetTables(_ context.Context, shardIDs []uint32, nodeName string) (map[uint32]*ShardTablesWithRole, error) { - // TODO: refactor more fine-grained locks - c.lock.RLock() - defer c.lock.RUnlock() - - shardTables := make(map[uint32]*ShardTablesWithRole, len(shardIDs)) - for _, shardID := range shardIDs { - shard, ok := c.shardsCache[shardID] - if !ok { - return nil, ErrShardNotFound.WithCausef("shardID:%d", shardID) - } - - shardRole := clusterpb.ShardRole_FOLLOWER - found := false - for i, n := range shard.nodes { - if nodeName == n.GetName() { - found = true - shardRole = shard.meta[i].ShardRole - break - } - } - if !found { - return nil, ErrNodeNotFound.WithCausef("nodeName not found in current shard, shardID:%d, nodeName:%s", shardID, nodeName) - } - - tables := make([]*Table, 0, len(shard.tables)) - for _, table := range shard.tables { - tables = append(tables, table) - } - shardTables[shardID] = &ShardTablesWithRole{shard: &ShardInfo{ - ID: shardID, - Role: shardRole, - Version: shard.version, - }, tables: tables} - } - - return shardTables, nil -} - -func (c *Cluster) DropTable(ctx context.Context, schemaName, tableName string) (*DropTableResult, error) { - c.lock.Lock() - defer c.lock.Unlock() - - schema, exists := c.getSchemaLocked(schemaName) - if !exists { - return nil, ErrSchemaNotFound.WithCausef("schemaName:%s", schemaName) - } - - table, ok := schema.getTable(tableName) - if !ok { - return nil, ErrTableNotFound - } - - if err := c.storage.DeleteTable(ctx, storage.DeleteTableRequest{ - ClusterID: c.clusterID, - SchemaID: storage.SchemaID(schema.GetID()), - TableName: tableName, - }); err != nil { - return nil, errors.WithMessagef(err, "storage drop table, clusterID:%d, schema:%v, tableName:%s", - c.clusterID, schema, tableName) - } - - shardID := table.GetShardID() - - // Update shardView in storage. - result, err := c.dropTableUpdateCacheLocked(ctx, shardID, schema, table) - if err != nil { - return nil, errors.WithMessagef(err, "drop table update cache, clusterID:%d, schema:%v, tableName:%s", - c.clusterID, schema, tableName) - } - return result, nil -} - // GetOrCreateSchema the second output parameter bool: Returns true if the schema was newly created. -func (c *Cluster) GetOrCreateSchema(ctx context.Context, schemaName string) (*Schema, bool, error) { - c.lock.Lock() - defer c.lock.Unlock() - - // Check if provided schema exists. - s, exists := c.getSchemaLocked(schemaName) - if exists { - return s, true, nil - } - - schemaID, err := c.allocSchemaID(ctx) - if err != nil { - return nil, false, errors.WithMessagef(err, "cluster AllocSchemaID, schemaName:%s", schemaName) - } - - // Save schema in storage. - schema := storage.Schema{ - ID: storage.SchemaID(schemaID), - ClusterID: c.clusterID, - Name: schemaName, - CreatedAt: uint64(time.Now().UnixMilli()), - } - schemaPB := storage.ConvertSchemaToPB(schema) - err = c.storage.CreateSchema(ctx, storage.CreateSchemaRequest{ - ClusterID: c.clusterID, - Schema: schema, - }) - if err != nil { - return nil, false, errors.WithMessage(err, "cluster CreateSchema") - } - - // Update schemasCache in memory. - s = c.updateSchemaCacheLocked(&schemaPB) - return s, false, nil -} - -func (c *Cluster) GetTable(ctx context.Context, schemaName, tableName string) (*Table, bool, error) { - c.lock.RLock() - schema, ok := c.schemasCache[schemaName] - if !ok { - c.lock.RUnlock() - return nil, false, ErrSchemaNotFound.WithCausef("schemaName:%s", schemaName) - } - - table, exists := schema.getTable(tableName) - if exists { - c.lock.RUnlock() - return table, true, nil - } - c.lock.RUnlock() - - // Search Table in storage. - tableResult, err := c.storage.GetTable(ctx, storage.GetTableRequest{ - ClusterID: c.clusterID, - SchemaID: storage.SchemaID(schema.GetID()), - TableName: tableName, - }) - if err != nil { - return nil, false, errors.WithMessage(err, "get table from storage") - } - - if !tableResult.Exists { - return nil, false, nil - } - - tablePB := storage.ConvertTableToPB(tableResult.Table) - if exists { - c.lock.Lock() - defer c.lock.Unlock() - - shardID, err := c.getTableShardIDLocked(uint64(tableResult.Table.ID)) - if err != nil { - return nil, false, errors.WithMessage(err, "get shard id") - } - table = c.updateTableCacheLocked(shardID, schema, &tablePB) - return table, true, nil - } - - return nil, false, nil +func (c *Cluster) GetOrCreateSchema(ctx context.Context, schemaName string) (storage.Schema, bool, error) { + return c.tableManager.GetOrCreateSchema(ctx, schemaName) } -func (c *Cluster) CreateTable(ctx context.Context, nodeName string, schemaName string, tableName string) (*CreateTableResult, error) { - c.lock.Lock() - defer c.lock.Unlock() - - // Check provided schema if exists. - schema, exists := c.getSchemaLocked(schemaName) - if !exists { - return nil, ErrSchemaNotFound.WithCausef("schemaName:%s", schemaName) - } - - // check if exists - _, exists = c.getTableLocked(schemaName, tableName) - if exists { - return nil, ErrTableAlreadyExists - } - - shardID, err := c.pickOneShardOnNode(nodeName) - if err != nil { - return nil, errors.WithMessagef(err, "pick one shard on node, clusterName:%s, schemaName:%s, tableName:%s, nodeName:%s", c.Name(), schemaName, tableName, nodeName) - } - - // Alloc table id. - tableID, err := c.allocTableID(ctx) - if err != nil { - return nil, errors.WithMessagef(err, "alloc table id, schemaName:%s, tableName:%s", schemaName, tableName) - } - - // Save table in storage. - tablePB := &clusterpb.Table{Id: tableID, Name: tableName, SchemaId: schema.GetID(), ShardId: shardID} - err = c.storage.CreateTable(ctx, storage.CreateTableRequest{ - ClusterID: c.clusterID, - SchemaID: storage.SchemaID(schema.GetID()), - Table: storage.ConvertTablePB(tablePB), - }) - if err != nil { - return nil, errors.WithMessage(err, "storage create table") - } - result, err := c.createTableOnShardLocked(ctx, shardID, schema, tablePB) - if err != nil { - return nil, errors.WithMessagef(err, "create table update shard view, clusterName:%s, schemaName:%s, tableName:%s, nodeName:%s", c.Name(), schemaName, tableName, nodeName) - } - - return result, nil +func (c *Cluster) GetTable(schemaName, tableName string) (storage.Table, bool, error) { + return c.tableManager.GetTable(schemaName, tableName) } -func (c *Cluster) loadClusterViewLocked(ctx context.Context) (map[uint32][]*clusterpb.Shard, []storage.ShardID, error) { - clusterViewResult, err := c.storage.GetClusterView(ctx, storage.GetClusterViewRequest{ - ClusterID: c.clusterID, - }) +func (c *Cluster) CreateTable(ctx context.Context, nodeName string, schemaName string, tableName string) (CreateTableResult, error) { + _, exists, err := c.tableManager.GetTable(schemaName, tableName) if err != nil { - return nil, nil, errors.WithMessage(err, "get cluster view") - } - clusterView := storage.ConvertClusterViewToPB(clusterViewResult.ClusterView) - c.metaData.clusterView = &clusterView - - if c.metaData.clusterView == nil { - return nil, nil, ErrClusterViewNotFound.WithCausef("cluster:%v", c) - } - - shardMap := map[uint32][]*clusterpb.Shard{} - for _, shard := range c.metaData.clusterView.ShardView { - shardMap[shard.Id] = append(shardMap[shard.Id], shard) - } - - shardIDs := make([]storage.ShardID, 0, len(shardMap)) - for id := range shardMap { - shardIDs = append(shardIDs, storage.ShardID(id)) + return CreateTableResult{}, err } - return shardMap, shardIDs, nil -} - -func (c *Cluster) loadShardViewLocked(ctx context.Context, shardIDs []storage.ShardID) (map[uint32]*clusterpb.ShardTopology, error) { - shardViewsResult, err := c.storage.ListShardViews(ctx, storage.ListShardViewsRequest{ - ClusterID: c.clusterID, - ShardIDs: shardIDs, - }) - if err != nil { - return nil, errors.WithMessage(err, "list shard views") - } - shardTopologyMap := make(map[uint32]*clusterpb.ShardTopology, len(shardIDs)) - for _, shardView := range shardViewsResult.ShardViews { - topology := storage.ConvertShardViewToPB(shardView) - shardTopologyMap[topology.ShardId] = &topology + if exists { + return CreateTableResult{}, ErrTableAlreadyExists } - return shardTopologyMap, nil -} -func (c *Cluster) loadSchemaLocked(ctx context.Context) (map[string]*clusterpb.Schema, error) { - schemasResult, err := c.storage.ListSchemas(ctx, storage.ListSchemasRequest{ClusterID: c.clusterID}) + // Create table in table manager. + table, err := c.tableManager.CreateTable(ctx, schemaName, tableName) if err != nil { - return nil, errors.WithMessage(err, "list schemas") - } - schemaMap := make(map[string]*clusterpb.Schema, len(schemasResult.Schemas)) - for _, schema := range schemasResult.Schemas { - schemaPB := storage.ConvertSchemaToPB(schema) - schemaMap[schema.Name] = &schemaPB + return CreateTableResult{}, errors.WithMessage(err, "table manager create table") } - return schemaMap, nil -} -func (c *Cluster) loadNodeLocked(ctx context.Context) (map[string]*clusterpb.Node, error) { - nodesResult, err := c.storage.ListNodes(ctx, storage.ListNodesRequest{ClusterID: c.clusterID}) + // Add table to topology manager. + result, err := c.topologyManager.AddTable(ctx, nodeName, table) if err != nil { - return nil, errors.WithMessage(err, "list nodes") - } - - nameNodes := make(map[string]*clusterpb.Node, len(nodesResult.Nodes)) - for _, node := range nodesResult.Nodes { - nodePB := storage.ConvertNodeToPB(node) - nameNodes[node.Name] = &nodePB - } - return nameNodes, nil -} - -func (c *Cluster) loadTableLocked(ctx context.Context, schemas map[string]*clusterpb.Schema) (map[string]map[uint64]*clusterpb.Table, error) { - tables := make(map[string]map[uint64]*clusterpb.Table) - for _, schema := range schemas { - tablesResult, err := c.storage.ListTables(ctx, storage.ListTableRequest{ - ClusterID: c.clusterID, - SchemaID: storage.SchemaID(schema.Id), - }) - if err != nil { - return nil, errors.WithMessage(err, "list tables") - } - for _, table := range tablesResult.Tables { - tablePB := storage.ConvertTableToPB(table) - if t, ok := tables[schema.GetName()]; ok { - t[tablePB.Id] = &tablePB - } else { - tables[schema.GetName()] = map[uint64]*clusterpb.Table{tablePB.GetId(): &tablePB} - } - } + return CreateTableResult{}, errors.WithMessage(err, "topology manager add table") } - return tables, nil -} -func (c *Cluster) getSchemaLocked(schemaName string) (*Schema, bool) { - schema, ok := c.schemasCache[schemaName] - return schema, ok + return CreateTableResult{ + Table: table, + ShardVersionUpdate: result, + }, nil } -func (c *Cluster) getTableLocked(schemaName string, tableName string) (*Table, bool) { - table, ok := c.schemasCache[schemaName].tableMap[tableName] - return table, ok +func (c *Cluster) GetShardNodes() ([]storage.ShardNode, error) { + return c.topologyManager.GetShardNodes().shardNodes, nil } -// GetShardByID return immutable `Shard`. -func (c *Cluster) GetShardByID(id uint32) (*Shard, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - return c.getShardByIDLocked(id) +func (c *Cluster) GetShardNodesByShardID(id storage.ShardID) ([]storage.ShardNode, error) { + return c.topologyManager.GetShardNodesByID(id) } -// GetShardByID return immutable `Shard`. -func (c *Cluster) getShardByIDLocked(id uint32) (*Shard, error) { - shard, ok := c.shardsCache[id] - if !ok { - return nil, ErrShardNotFound.WithCausef("get shard from cache, shardID:%d", id) - } - return shard, nil +func (c *Cluster) GetShardNodeByTableIDs(tableIDs []storage.TableID) (GetShardNodesByTableIDsResult, error) { + return c.topologyManager.GetShardNodesByTableIDs(tableIDs) } -func (c *Cluster) GetShardIDsByNode(nodeName string) ([]uint32, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - shardsOfNode, ok := c.nodeShardsCache[nodeName] - if !ok { - return nil, ErrNodeNotFound.WithCausef("shards of node not found in cache, nodeName:%s", nodeName) - } - return shardsOfNode.ShardIDs, nil -} - -func (c *Cluster) RegisterNode(ctx context.Context, nodeInfo *metaservicepb.NodeInfo) error { - // FIXME: add specific method to do conversion from `metaservicepb.NodeInfo` to `clusterpb.Node`. - nodeStats := storage.NodeStats{ - Lease: nodeInfo.Lease, - Zone: nodeInfo.Zone, - NodeVersion: nodeInfo.BinaryVersion, - } - node := storage.Node{NodeStats: nodeStats, Name: nodeInfo.GetEndpoint(), State: storage.Online} - +func (c *Cluster) RegisterNode(ctx context.Context, node storage.Node, shardInfos []ShardInfo) error { + node.State = storage.Online err := c.storage.CreateOrUpdateNode(ctx, storage.CreateOrUpdateNodeRequest{ ClusterID: c.clusterID, Node: node, }) if err != nil { - return errors.WithMessagef(err, "create or update node, nodeName:%s", nodeInfo.GetEndpoint()) - } - - shardInfos := make([]*ShardInfo, 0, len(nodeInfo.ShardInfos)) - for _, shardInfo := range nodeInfo.ShardInfos { - shardInfo := &ShardInfo{ - ID: shardInfo.Id, - Role: shardInfo.Role, - Version: shardInfo.Version, - } - shardInfos = append(shardInfos, shardInfo) + return errors.WithMessagef(err, "create or update node, nodeName:%s", node.Name) } c.lock.Lock() @@ -760,23 +191,23 @@ func (c *Cluster) RegisterNode(ctx context.Context, nodeInfo *metaservicepb.Node nodePB := storage.ConvertNodeToPB(node) newRegisterNode := NewRegisteredNode(&nodePB, shardInfos) - c.registeredNodesCache[nodeInfo.GetEndpoint()] = newRegisterNode + c.registeredNodesCache[node.Name] = newRegisterNode return nil } -func (c *Cluster) GetRegisteredNodes() []*RegisteredNode { +func (c *Cluster) GetRegisteredNodes() []RegisteredNode { c.lock.RLock() defer c.lock.RUnlock() - nodes := make([]*RegisteredNode, 0, len(c.registeredNodesCache)) + nodes := make([]RegisteredNode, 0, len(c.registeredNodesCache)) for _, node := range c.registeredNodesCache { nodes = append(nodes, node) } return nodes } -func (c *Cluster) GetRegisteredNode(nodeName string) (*RegisteredNode, bool) { +func (c *Cluster) GetRegisteredNode(nodeName string) (RegisteredNode, bool) { c.lock.RLock() defer c.lock.RUnlock() @@ -784,22 +215,6 @@ func (c *Cluster) GetRegisteredNode(nodeName string) (*RegisteredNode, bool) { return registeredNode, ok } -func (c *Cluster) allocSchemaID(ctx context.Context) (uint32, error) { - id, err := c.schemaIDAlloc.Alloc(ctx) - if err != nil { - return 0, errors.WithMessage(err, "alloc schema id") - } - return uint32(id), nil -} - -func (c *Cluster) allocTableID(ctx context.Context) (uint64, error) { - id, err := c.tableIDAlloc.Alloc(ctx) - if err != nil { - return 0, errors.WithMessage(err, "alloc table id") - } - return id, nil -} - func (c *Cluster) AllocShardID(ctx context.Context) (uint32, error) { id, err := c.shardIDAlloc.Alloc(ctx) if err != nil { @@ -808,231 +223,124 @@ func (c *Cluster) AllocShardID(ctx context.Context) (uint32, error) { return uint32(id), nil } -func (c *Cluster) pickOneShardOnNode(nodeName string) (uint32, error) { - if shardsOfNode, ok := c.nodeShardsCache[nodeName]; ok { - shardIDs := shardsOfNode.ShardIDs - if len(shardIDs) == 0 { - return 0, ErrNodeShardsIsEmpty.WithCausef("nodeName:%s", nodeName) +func (c *Cluster) RouteTables(_ context.Context, schemaName string, tableNames []string) (RouteTablesResult, error) { + tables := make(map[storage.TableID]storage.Table, len(tableNames)) + tableIDs := make([]storage.TableID, 0, len(tableNames)) + for _, tableName := range tableNames { + table, exists, err := c.tableManager.GetTable(schemaName, tableName) + if err != nil { + return RouteTablesResult{}, errors.WithMessage(err, "get table") + } + if exists { + tables[table.ID] = table + tableIDs = append(tableIDs, table.ID) } - - idx := rand.Int31n(int32(len((shardIDs)))) // #nosec G404 - return shardIDs[idx], nil } - return 0, ErrNodeNotFound.WithCausef("nodeName:%s", nodeName) -} - -func (c *Cluster) RouteTables(_ context.Context, schemaName string, tableNames []string) (*RouteTablesResult, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - schema, ok := c.schemasCache[schemaName] - if !ok { - routeEntries := make(map[string]*RouteEntry) - return &RouteTablesResult{ - Version: c.metaData.clusterView.Version, - RouteEntries: routeEntries, - }, nil + tableShardNodesWithShardViewVersion, err := c.topologyManager.GetShardNodesByTableIDs(tableIDs) + if err != nil { + return RouteTablesResult{}, errors.WithMessagef(err, "topology get shard nodes by table ids, Tables:%v", tables) } - - routeEntries := make(map[string]*RouteEntry, len(tableNames)) - for _, tableName := range tableNames { - table, exists := schema.getTable(tableName) - if exists { - shard, err := c.GetShardByID(table.GetShardID()) - if err != nil { - return nil, errors.WithMessage(err, fmt.Sprintf("shard not found, shardID:%d", table.GetShardID())) - } - - nodeShards := make([]*NodeShard, 0, len(shard.nodes)) - for i, node := range shard.nodes { - nodeShards = append(nodeShards, &NodeShard{ - Endpoint: node.GetName(), - ShardInfo: &ShardInfo{ - ID: shard.meta[i].GetId(), - Role: shard.meta[i].GetShardRole(), - }, - }) - } - - routeEntries[tableName] = &RouteEntry{ - Table: &TableInfo{ - ID: table.GetID(), - Name: table.GetName(), - SchemaID: table.GetSchemaID(), - SchemaName: table.GetSchemaName(), - }, - NodeShards: nodeShards, - } + routeEntries := make(map[string]RouteEntry, len(tableNames)) + for tableID, value := range tableShardNodesWithShardViewVersion.ShardNodes { + nodeShards := make([]ShardNodeWithVersion, 0, len(value)) + for _, shardNode := range value { + nodeShards = append(nodeShards, ShardNodeWithVersion{ + version: tableShardNodesWithShardViewVersion.Version[shardNode.ID], + ShardNode: shardNode, + }) + } + table := tables[tableID] + routeEntries[table.Name] = RouteEntry{ + Table: TableInfo{ + ID: table.ID, + Name: table.Name, + SchemaID: table.SchemaID, + SchemaName: schemaName, + }, + NodeShards: nodeShards, } } - - return &RouteTablesResult{ - Version: c.metaData.clusterView.Version, - RouteEntries: routeEntries, + return RouteTablesResult{ + ClusterViewVersion: c.topologyManager.GetVersion(), + RouteEntries: routeEntries, }, nil } -func (c *Cluster) GetNodeShards(_ context.Context) (*GetNodeShardsResult, error) { - nodeShards := make([]*NodeShard, 0, len(c.nodeShardsCache)) +func (c *Cluster) GetNodeShards(_ context.Context) (GetNodeShardsResult, error) { + getNodeShardsResult := c.topologyManager.GetShardNodes() - c.lock.RLock() - defer c.lock.RUnlock() - - for nodeName, shardsOfNode := range c.nodeShardsCache { - for _, shardID := range shardsOfNode.ShardIDs { - shard, ok := c.shardsCache[shardID] - if !ok { - return nil, ErrShardNotFound.WithCausef("shardID:%d", shardID) - } - - shardMeta, ok := shard.FindShardByNode(nodeName) - if !ok { - return nil, ErrShardNotFound.WithCausef("find shard from cache") - } + shardNodesWithVersion := make([]ShardNodeWithVersion, 0, len(getNodeShardsResult.shardNodes)) - nodeShards = append(nodeShards, &NodeShard{ - Endpoint: nodeName, - ShardInfo: &ShardInfo{ - ID: shardID, - Role: shardMeta.ShardRole, - Version: shard.GetVersion(), - }, - }) - } + for _, shardNode := range getNodeShardsResult.shardNodes { + shardNodesWithVersion = append(shardNodesWithVersion, ShardNodeWithVersion{ + version: getNodeShardsResult.versions[shardNode.ID], + ShardNode: shardNode, + }) } - return &GetNodeShardsResult{ - ClusterTopologyVersion: c.metaData.clusterView.GetVersion(), - NodeShards: nodeShards, + return GetNodeShardsResult{ + ClusterTopologyVersion: c.topologyManager.GetVersion(), + NodeShards: shardNodesWithVersion, }, nil } -func (c *Cluster) GetClusterVersion() uint64 { - c.lock.RLock() - defer c.lock.RUnlock() - - return c.metaData.clusterView.Version -} - func (c *Cluster) GetClusterMinNodeCount() uint32 { c.lock.RLock() defer c.lock.RUnlock() - return c.metaData.cluster.MinNodeCount + return c.metaData.MinNodeCount } func (c *Cluster) GetTotalShardNum() uint32 { c.lock.RLock() defer c.lock.RUnlock() - return c.metaData.cluster.ShardTotal + return c.metaData.ShardTotal } -func (c *Cluster) GetClusterState() clusterpb.ClusterTopology_ClusterState { +func (c *Cluster) GetClusterState() storage.ClusterState { c.lock.RLock() defer c.lock.RUnlock() - return c.metaData.clusterView.State + return c.topologyManager.GetClusterState() } -func (c *Cluster) CreateShardViews(ctx context.Context, state clusterpb.ClusterTopology_ClusterState, shardViewPBs []*clusterpb.ShardTopology, shardNodePBs []*clusterpb.Shard) error { - c.lock.Lock() - defer c.lock.Unlock() - - shardViews := make([]storage.ShardView, 0, len(shardViewPBs)) - for _, shardViewPB := range shardViewPBs { - shardViews = append(shardViews, storage.ConvertShardViewPB(shardViewPB)) - } - err := c.storage.CreateShardViews(ctx, storage.CreateShardViewsRequest{ - ClusterID: c.clusterID, - ShardViews: shardViews, - }) - if err != nil { - return errors.WithMessage(err, "create shard views") - } - - clusterViewResult, err := c.storage.GetClusterView(ctx, storage.GetClusterViewRequest{ - ClusterID: c.clusterID, - }) - if err != nil { - return errors.WithMessage(err, "get cluster view") - } - - shardNodes := make([]storage.ShardNode, 0, len(shardNodePBs)) - for _, shardNodePB := range shardNodePBs { - shardNodes = append(shardNodes, storage.ConvertShardNodePB(shardNodePB)) - } - - clusterViewResult.ClusterView.ShardNodes = shardNodes - clusterViewResult.ClusterView.State = storage.ConvertClusterStatePB(state) - preVersion := clusterViewResult.ClusterView.Version - clusterViewResult.ClusterView.Version++ - - if err = c.storage.UpdateClusterView(ctx, storage.PutClusterViewRequest{ - ClusterID: c.clusterID, - ClusterView: clusterViewResult.ClusterView, - LatestVersion: preVersion, - }); err != nil { - return err +func (c *Cluster) UpdateClusterView(ctx context.Context, state storage.ClusterState, shardNodes []storage.ShardNode) error { + if err := c.topologyManager.UpdateClusterView(ctx, state, shardNodes); err != nil { + return errors.WithMessage(err, "update cluster view") } - clusterViewPB := storage.ConvertClusterViewToPB(clusterViewResult.ClusterView) - c.metaData.clusterView = &clusterViewPB - - return c.updateTopologyCache(shardViewPBs, shardNodePBs) + return nil } -func (c *Cluster) updateTopologyCache(shardTopologies []*clusterpb.ShardTopology, shardView []*clusterpb.Shard) error { - shardsByID := make(map[uint32]*clusterpb.Shard, len(shardView)) - for _, shard := range shardView { - shardsByID[shard.Id] = shard +func (c *Cluster) CreateShardViews(ctx context.Context, views []CreateShardView) error { + if err := c.topologyManager.CreateShardViews(ctx, views); err != nil { + return errors.WithMessage(err, "create shard views") } - newNodeShardsCache := map[string]*ShardsOfNode{} - newShardsCache := make(map[uint32]*Shard, len(shardTopologies)) + return nil +} - for _, shardTopology := range shardTopologies { - shard, ok := shardsByID[shardTopology.ShardId] - if !ok { - return ErrShardNotFound.WithCausef("updateTopologyCache missing shard in shardView, shard_id:%d", shardTopology.ShardId) - } - nodeName := shard.Node +// Initialize the cluster view and shard view of the cluster. +// It will be used when we create the cluster. +func (c *Cluster) init(ctx context.Context) error { + c.lock.Lock() + defer c.lock.Unlock() - nodeShards, ok := newNodeShardsCache[nodeName] - if !ok { - nodeShards = &ShardsOfNode{ - Endpoint: nodeName, - ShardIDs: []uint32{}, - } - newNodeShardsCache[nodeName] = nodeShards - } - nodeShards.ShardIDs = append(nodeShards.ShardIDs, shardTopology.ShardId) + return c.topologyManager.InitClusterView(ctx) +} - cachedShard, ok := newShardsCache[shardTopology.ShardId] - if !ok { - cachedShard = &Shard{ - meta: []*clusterpb.Shard{}, - nodes: []*clusterpb.Node{}, - tables: map[uint64]*Table{}, - version: shardTopology.Version, - } - newShardsCache[shardTopology.ShardId] = cachedShard - } - cachedShard.meta = append(cachedShard.meta, shard) - // TODO: Here shardsCache should not contain the register node information (shardsCache will be refactored in the future), - // so here no need to set these missing fields in clusterpb.Node. - nodePB := &clusterpb.Node{ - Name: shard.Node, - } - cachedShard.nodes = append(cachedShard.nodes, nodePB) - } +// Load cluster meta from storage to memory. +func (c *Cluster) load(ctx context.Context) error { + c.lock.Lock() + defer c.lock.Unlock() - for nodeName, shardsOfNode := range newNodeShardsCache { - c.nodeShardsCache[nodeName] = shardsOfNode + if err := c.tableManager.Load(ctx); err != nil { + return errors.WithMessage(err, "load table manager") } - for shardID, shard := range newShardsCache { - c.shardsCache[shardID] = shard + if err := c.topologyManager.Load(ctx); err != nil { + return errors.WithMessage(err, "load topology manager") } return nil diff --git a/server/cluster/data/table_manager.go b/server/cluster/data/table_manager.go deleted file mode 100644 index dc1420a4..00000000 --- a/server/cluster/data/table_manager.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -package data - -import ( - "sync" - - "github.com/CeresDB/ceresdbproto/pkg/clusterpb" - "github.com/CeresDB/ceresmeta/server/storage" -) - -// TableManager manages table metadata by schema. -type TableManager interface{} - -// nolint -type TableManagerImpl struct { - storage storage.Storage - - // RWMutex is used to protect following fields. - lock sync.RWMutex - schemaMeta *clusterpb.Schema // schema meta info in storage - schemaTables map[string]*Tables // schemaName -> tables -} - -// nolint -type Tables struct { - tables map[string]*clusterpb.Table -} diff --git a/server/cluster/data/topology_manager.go b/server/cluster/data/topology_manager.go deleted file mode 100644 index 51a62da6..00000000 --- a/server/cluster/data/topology_manager.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -package data - -import ( - "sync" - - "github.com/CeresDB/ceresdbproto/pkg/clusterpb" - "github.com/CeresDB/ceresmeta/server/storage" -) - -// TopologyManager manages the cluster topology, including the mapping relationship between shards, nodes, and tables. -type TopologyManager interface{} - -// nolint -type TopologyManagerImpl struct { - storage storage.Storage - - // RWMutex is used to protect following fields. - lock sync.RWMutex - // ClusterTopology in memory. - shardNodesMapping map[uint32][]*clusterpb.Shard // shardID -> nodes of the shard - nodeShardsMapping map[string][]*clusterpb.Shard // nodeName -> shards of the node - version uint64 // clusterTopology version - // ShardTopology in memory. - shardTablesMapping map[uint32]*clusterpb.ShardTopology // shardID -> shardTopology - tableShardMapping map[uint32]uint32 // tableID -> shardID -} diff --git a/server/cluster/error.go b/server/cluster/error.go index fbee9cdc..061113db 100644 --- a/server/cluster/error.go +++ b/server/cluster/error.go @@ -19,5 +19,5 @@ var ( ErrNodeShardsIsEmpty = coderr.NewCodeError(coderr.Internal, "node's shard list is empty") ErrGetShardView = coderr.NewCodeError(coderr.Internal, "get shard view") ErrTableAlreadyExists = coderr.NewCodeError(coderr.Internal, "table already exists") - ErrShardViewVersionNotMatch = coderr.NewCodeError(coderr.Internal, "shard view version not match") + ErrShardViewVersionNotMatch = coderr.NewCodeError(coderr.Internal, "shard view versions not match") ) diff --git a/server/cluster/manager.go b/server/cluster/manager.go index 5fa480c6..7a48e6f3 100644 --- a/server/cluster/manager.go +++ b/server/cluster/manager.go @@ -34,17 +34,13 @@ type Manager interface { GetCluster(ctx context.Context, clusterName string) (*Cluster, error) // AllocSchemaID means get or create schema. // The second output parameter bool: Returns true if the table was newly created. - AllocSchemaID(ctx context.Context, clusterName, schemaName string) (uint32, bool, error) - // AllocTableID means get or create table. - // The second output parameter bool: Returns true if the table was newly created. - AllocTableID(ctx context.Context, clusterName, schemaName, tableName, nodeName string) (*Table, bool, error) - GetTables(ctx context.Context, clusterName, nodeName string, shardIDs []uint32) (map[uint32]*ShardTables, error) + AllocSchemaID(ctx context.Context, clusterName, schemaName string) (storage.SchemaID, bool, error) + GetTables(clusterName, nodeName string, shardIDs []storage.ShardID) (map[storage.ShardID]ShardTables, error) DropTable(ctx context.Context, clusterName, schemaName, tableName string) error - GetShardIDs(ctx context.Context, clusterName, nodeName string) ([]uint32, error) - RouteTables(ctx context.Context, clusterName, schemaName string, tableNames []string) (*RouteTablesResult, error) - GetNodeShards(ctx context.Context, clusterName string) (*GetNodeShardsResult, error) + RouteTables(ctx context.Context, clusterName, schemaName string, tableNames []string) (RouteTablesResult, error) + GetNodeShards(ctx context.Context, clusterName string) (GetNodeShardsResult, error) - RegisterNode(ctx context.Context, clusterName string, nodeInfo *metaservicepb.NodeInfo) error + RegisterNode(ctx context.Context, clusterName string, nodeName storage.Node, shardInfos []ShardInfo) error GetRegisteredNode(ctx context.Context, clusterName string, node string) (*metaservicepb.NodeInfo, error) } @@ -132,7 +128,7 @@ func (m *managerImpl) CreateCluster(ctx context.Context, clusterName string, ini return nil, errors.WithMessagef(err, "cluster manager CreateCluster, clusterName:%s", clusterName) } - if err := cluster.Load(ctx); err != nil { + if err := cluster.load(ctx); err != nil { log.Error("fail to load cluster", zap.Error(err)) return nil, errors.WithMessagef(err, "cluster manager CreateCluster, clusterName:%s", clusterName) } @@ -153,7 +149,7 @@ func (m *managerImpl) GetCluster(_ context.Context, clusterName string) (*Cluste return nil, ErrClusterNotFound } -func (m *managerImpl) AllocSchemaID(ctx context.Context, clusterName, schemaName string) (uint32, bool, error) { +func (m *managerImpl) AllocSchemaID(ctx context.Context, clusterName, schemaName string) (storage.SchemaID, bool, error) { cluster, err := m.getCluster(clusterName) if err != nil { log.Error("cluster not found", zap.Error(err)) @@ -167,79 +163,36 @@ func (m *managerImpl) AllocSchemaID(ctx context.Context, clusterName, schemaName return 0, false, errors.WithMessagef(err, "cluster manager AllocSchemaID, "+ "clusterName:%s, schemaName:%s", clusterName, schemaName) } - return schema.GetID(), exists, nil -} - -func (m *managerImpl) AllocTableID(ctx context.Context, clusterName, schemaName, tableName, nodeName string) (*Table, bool, error) { - cluster, err := m.getCluster(clusterName) - if err != nil { - log.Error("cluster not found", zap.Error(err)) - return nil, false, errors.WithMessage(err, "get cluster") - } - - table, exists, err := cluster.GetTable(ctx, schemaName, tableName) - if err != nil { - return nil, false, errors.WithMessage(err, "get table") - } - - if exists { - return table, true, nil - } - - ret, err := cluster.CreateTable(ctx, nodeName, schemaName, tableName) - if err != nil { - log.Error("fail to create table", zap.Error(err)) - return nil, false, errors.WithMessagef(err, "create table, "+ - "clusterName:%s, schemaName:%s, tableName:%s, nodeName:%s", clusterName, schemaName, tableName, nodeName) - } - return ret.Table, false, nil + return schema.ID, exists, nil } -func (m *managerImpl) GetTables(ctx context.Context, clusterName, nodeName string, shardIDs []uint32) (map[uint32]*ShardTables, error) { +func (m *managerImpl) GetTables(clusterName, nodeName string, shardIDs []storage.ShardID) (map[storage.ShardID]ShardTables, error) { cluster, err := m.getCluster(clusterName) if err != nil { - log.Error("cluster not found", zap.Error(err)) - return nil, errors.WithMessage(err, "cluster manager GetTables") - } - - shardTablesWithRole, err := cluster.GetTables(ctx, shardIDs, nodeName) - if err != nil { - return nil, errors.WithMessagef(err, "cluster manager GetTables, "+ - "clusterName:%s, nodeName:%s, shardIDs:%v", clusterName, nodeName, shardIDs) + return nil, errors.WithMessagef(err, "get cluster, cluster name:%s", clusterName) } - ret := make(map[uint32]*ShardTables, len(shardIDs)) - for shardID, shardTables := range shardTablesWithRole { - tableInfos := make([]*TableInfo, 0, len(shardTables.tables)) - - for _, t := range shardTables.tables { - tableInfos = append(tableInfos, &TableInfo{ - ID: t.meta.GetId(), Name: t.meta.GetName(), - SchemaID: t.schema.GetId(), SchemaName: t.schema.GetName(), - }) - } - ret[shardID] = &ShardTables{Shard: shardTables.shard, Tables: tableInfos} - } - return ret, nil + shardTables := cluster.GetTables(shardIDs, nodeName) + return shardTables, nil } func (m *managerImpl) DropTable(ctx context.Context, clusterName, schemaName, tableName string) error { cluster, err := m.getCluster(clusterName) if err != nil { log.Error("cluster not found", zap.Error(err)) - return errors.WithMessage(err, "cluster manager DropTable") + return errors.WithMessage(err, "cluster manager drop table") } _, err = cluster.DropTable(ctx, schemaName, tableName) if err != nil { - return errors.WithMessagef(err, "cluster manager DropTable, clusterName:%s, schemaName:%s, tableName:%s", + return errors.WithMessagef(err, "cluster manager drop table, clusterName:%s, schemaName:%s, tableName:%s", clusterName, schemaName, tableName) } return nil } -func (m *managerImpl) RegisterNode(ctx context.Context, clusterName string, nodeInfo *metaservicepb.NodeInfo) error { +func (m *managerImpl) RegisterNode(ctx context.Context, clusterName string, node storage.Node, shardInfos []ShardInfo) error { m.lock.RLock() defer m.lock.RUnlock() @@ -252,7 +205,7 @@ func (m *managerImpl) RegisterNode(ctx context.Context, clusterName string, node log.Error("cluster not found", zap.Error(err)) return errors.WithMessage(err, "cluster manager RegisterNode") } - err = cluster.RegisterNode(ctx, nodeInfo) + err = cluster.RegisterNode(ctx, node, shardInfos) if err != nil { return errors.WithMessage(err, "cluster manager RegisterNode") } @@ -287,20 +240,6 @@ func (m *managerImpl) GetRegisteredNode(_ context.Context, clusterName string, n return &nodeInfo, nil } -func (m *managerImpl) GetShardIDs(_ context.Context, clusterName, nodeName string) ([]uint32, error) { - cluster, err := m.getCluster(clusterName) - if err != nil { - log.Error("cluster not found", zap.Error(err)) - return nil, errors.WithMessage(err, "cluster manager GetShards") - } - - shardIDs, err := cluster.GetShardIDsByNode(nodeName) - if err != nil { - return nil, errors.WithMessage(err, "cluster manager GetShards") - } - return shardIDs, nil -} - func (m *managerImpl) getCluster(clusterName string) (*Cluster, error) { m.lock.RLock() cluster, ok := m.clusters[clusterName] @@ -337,7 +276,7 @@ func (m *managerImpl) Start(ctx context.Context) error { m.clusters = make(map[string]*Cluster, len(clusters.Clusters)) for _, cluster := range clusters.Clusters { cluster := NewCluster(cluster, m.storage, m.kv, m.rootPath, m.idAllocatorStep) - if err := cluster.Load(ctx); err != nil { + if err := cluster.load(ctx); err != nil { log.Error("fail to load cluster", zap.String("cluster", cluster.Name()), zap.Error(err)) return errors.WithMessagef(err, "fail to load cluster:%v", cluster.Name()) } @@ -362,31 +301,31 @@ func (m *managerImpl) Stop(_ context.Context) error { return nil } -func (m *managerImpl) RouteTables(ctx context.Context, clusterName, schemaName string, tableNames []string) (*RouteTablesResult, error) { +func (m *managerImpl) RouteTables(ctx context.Context, clusterName, schemaName string, tableNames []string) (RouteTablesResult, error) { cluster, err := m.getCluster(clusterName) if err != nil { log.Error("cluster not found", zap.Error(err)) - return nil, errors.WithMessage(err, "cluster manager routeTables") + return RouteTablesResult{}, errors.WithMessage(err, "cluster manager routeTables") } ret, err := cluster.RouteTables(ctx, schemaName, tableNames) if err != nil { log.Error("cluster manager RouteTables", zap.Error(err)) - return nil, errors.WithMessage(err, "cluster manager routeTables") + return RouteTablesResult{}, errors.WithMessage(err, "cluster manager routeTables") } return ret, nil } -func (m *managerImpl) GetNodeShards(ctx context.Context, clusterName string) (*GetNodeShardsResult, error) { +func (m *managerImpl) GetNodeShards(ctx context.Context, clusterName string) (GetNodeShardsResult, error) { cluster, err := m.getCluster(clusterName) if err != nil { - return nil, errors.WithMessage(err, "cluster manager GetNodes") + return GetNodeShardsResult{}, errors.WithMessage(err, "cluster manager GetNodes") } ret, err := cluster.GetNodeShards(ctx) if err != nil { - return nil, errors.WithMessage(err, "cluster manager GetNodes") + return GetNodeShardsResult{}, errors.WithMessage(err, "cluster manager GetNodes") } return ret, nil diff --git a/server/cluster/node.go b/server/cluster/node.go index aeaf3eba..63ad8f1a 100644 --- a/server/cluster/node.go +++ b/server/cluster/node.go @@ -8,17 +8,17 @@ import ( type RegisteredNode struct { meta *clusterpb.Node - shardInfos []*ShardInfo + shardInfos []ShardInfo } -func NewRegisteredNode(meta *clusterpb.Node, shardInfos []*ShardInfo) *RegisteredNode { - return &RegisteredNode{ +func NewRegisteredNode(meta *clusterpb.Node, shardInfos []ShardInfo) RegisteredNode { + return RegisteredNode{ meta, shardInfos, } } -func (n *RegisteredNode) GetShardInfos() []*ShardInfo { +func (n *RegisteredNode) GetShardInfos() []ShardInfo { return n.shardInfos } diff --git a/server/cluster/schema.go b/server/cluster/schema.go deleted file mode 100644 index 9b8f4f7e..00000000 --- a/server/cluster/schema.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -package cluster - -import "github.com/CeresDB/ceresdbproto/pkg/clusterpb" - -type Schema struct { - meta *clusterpb.Schema - - tableMap map[string]*Table -} - -func (s *Schema) GetID() uint32 { - return s.meta.GetId() -} - -func (s *Schema) GetName() string { - return s.meta.GetName() -} - -func (s *Schema) getTable(tableName string) (*Table, bool) { - table, ok := s.tableMap[tableName] - return table, ok -} - -func (s *Schema) dropTableLocked(tableName string) { - delete(s.tableMap, tableName) -} diff --git a/server/cluster/shard.go b/server/cluster/shard.go deleted file mode 100644 index 045935af..00000000 --- a/server/cluster/shard.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -package cluster - -import ( - "github.com/CeresDB/ceresdbproto/pkg/clusterpb" -) - -const ( - MinShardID = 0 -) - -type Shard struct { - // FIXME: The relationship between shard and node is missing here. - meta []*clusterpb.Shard - nodes []*clusterpb.Node - tables map[uint64]*Table // table_id => table - version uint64 -} - -// FIXME: avoid returning *clusterpb.Shard. -func (s *Shard) FindShardByNode(nodeName string) (*clusterpb.Shard, bool) { - for i := range s.nodes { - if s.nodes[i].GetName() == nodeName { - shard := s.meta[i] - return shard, true - } - } - - return nil, false -} - -func (s *Shard) dropTableLocked(tableID uint64) { - delete(s.tables, tableID) -} - -func (s *Shard) GetLeader() *clusterpb.Shard { - for i, shard := range s.meta { - if clusterpb.ShardRole_LEADER == shard.ShardRole { - return s.meta[i] - } - } - return nil -} - -func (s *Shard) GetVersion() uint64 { - return s.version -} - -type ShardTablesWithRole struct { - shard *ShardInfo - tables []*Table -} diff --git a/server/cluster/table.go b/server/cluster/table.go index a54a8596..12339c64 100644 --- a/server/cluster/table.go +++ b/server/cluster/table.go @@ -1,40 +1,3 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. package cluster - -import "github.com/CeresDB/ceresdbproto/pkg/clusterpb" - -type Table struct { - shardID uint32 - schema *clusterpb.Schema - meta *clusterpb.Table -} - -func (t *Table) GetInfo() TableInfo { - return TableInfo{ - ID: t.GetID(), - Name: t.GetName(), - SchemaID: t.GetSchemaID(), - SchemaName: t.GetSchemaName(), - } -} - -func (t *Table) GetID() uint64 { - return t.meta.GetId() -} - -func (t *Table) GetName() string { - return t.meta.GetName() -} - -func (t *Table) GetSchemaName() string { - return t.schema.GetName() -} - -func (t *Table) GetSchemaID() uint32 { - return t.schema.GetId() -} - -func (t *Table) GetShardID() uint32 { - return t.shardID -} diff --git a/server/cluster/table_manager.go b/server/cluster/table_manager.go new file mode 100644 index 00000000..c8d7cd98 --- /dev/null +++ b/server/cluster/table_manager.go @@ -0,0 +1,287 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +package cluster + +import ( + "context" + "sync" + "time" + + "github.com/CeresDB/ceresmeta/server/id" + + "github.com/CeresDB/ceresmeta/pkg/log" + "go.uber.org/zap" + + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/pkg/errors" +) + +// TableManager manages table metadata by schema. +type TableManager interface { + Load(context.Context) error + GetTable(schemaName string, tableName string) (storage.Table, bool, error) + GetTablesByIDs([]storage.TableID) []storage.Table + CreateTable(ctx context.Context, schemaName string, tableName string) (storage.Table, error) + DropTable(ctx context.Context, schemaName string, tableName string) error + GetSchemaByName(string) (storage.Schema, bool) + GetSchemas() []storage.Schema + GetOrCreateSchema(context.Context, string) (storage.Schema, bool, error) +} + +// nolint +type Tables struct { + tables map[string]storage.Table + tablesByID map[storage.TableID]storage.Table +} + +// nolint +type TableManagerImpl struct { + storage storage.Storage + clusterID storage.ClusterID + schemaIDAlloc id.Allocator + tableIDAlloc id.Allocator + + // RWMutex is used to protect following fields. + lock sync.RWMutex + schemas map[string]storage.Schema // schemaName -> schema + schemasByID map[storage.SchemaID]storage.Schema // schemaID -> schema + schemaTables map[storage.SchemaID]*Tables // schemaName -> Tables +} + +func NewTableManagerImpl(storage storage.Storage, clusterID storage.ClusterID, schemaIDAlloc id.Allocator, tableIDAlloc id.Allocator) TableManager { + return &TableManagerImpl{ + storage: storage, + clusterID: clusterID, + schemaIDAlloc: schemaIDAlloc, + tableIDAlloc: tableIDAlloc, + } +} + +func (m *TableManagerImpl) Load(ctx context.Context) error { + m.lock.Lock() + defer m.lock.Unlock() + + if err := m.loadSchema(ctx); err != nil { + return errors.WithMessage(err, "load schemas") + } + + if err := m.loadTable(ctx); err != nil { + return errors.WithMessage(err, "load Tables") + } + + return nil +} + +func (m *TableManagerImpl) GetTable(schemaName, tableName string) (storage.Table, bool, error) { + m.lock.RLock() + defer m.lock.RUnlock() + + return m.getTable(schemaName, tableName) +} + +func (m *TableManagerImpl) GetTablesByIDs(tableIDs []storage.TableID) []storage.Table { + m.lock.RLock() + defer m.lock.RUnlock() + + result := make([]storage.Table, 0, len(tableIDs)) + for _, tables := range m.schemaTables { + for _, tableID := range tableIDs { + table, ok := tables.tablesByID[tableID] + if !ok { + log.Warn("table not exists", zap.Uint64("tableID", uint64(tableID))) + continue + } + result = append(result, table) + } + } + + return result +} + +func (m *TableManagerImpl) CreateTable(ctx context.Context, schemaName string, tableName string) (storage.Table, error) { + m.lock.Lock() + defer m.lock.Unlock() + _, exists, err := m.getTable(schemaName, tableName) + if err != nil { + return storage.Table{}, errors.WithMessage(err, "create table") + } + + if exists { + return storage.Table{}, ErrTableAlreadyExists + } + + // Create table in storage. + schema, ok := m.schemas[schemaName] + if !ok { + return storage.Table{}, ErrSchemaNotFound.WithCausef("schema name:%d", schemaName) + } + + id, err := m.tableIDAlloc.Alloc(ctx) + if err != nil { + return storage.Table{}, errors.WithMessage(err, "alloc table id") + } + + table := storage.Table{ + ID: storage.TableID(id), + Name: tableName, + SchemaID: schema.ID, + CreatedAt: uint64(time.Now().UnixMilli()), + } + err = m.storage.CreateTable(ctx, storage.CreateTableRequest{ + ClusterID: m.clusterID, + SchemaID: schema.ID, + Table: table, + }) + + if err != nil { + return storage.Table{}, errors.WithMessage(err, "create table in storage") + } + + // Update table in memory. + _, ok = m.schemaTables[schema.ID] + if !ok { + m.schemaTables[schema.ID] = &Tables{ + tables: make(map[string]storage.Table), + tablesByID: make(map[storage.TableID]storage.Table), + } + } + tables := m.schemaTables[schema.ID] + tables.tables[tableName] = table + tables.tablesByID[table.ID] = table + + return table, nil +} + +func (m *TableManagerImpl) DropTable(ctx context.Context, schemaName string, tableName string) error { + m.lock.Lock() + defer m.lock.Unlock() + + schema, ok := m.schemas[schemaName] + if !ok { + return nil + } + + table, ok := m.schemaTables[schema.ID].tables[tableName] + if !ok { + return nil + } + + // Drop table in storage. + err := m.storage.DeleteTable(ctx, storage.DeleteTableRequest{ + ClusterID: m.clusterID, + SchemaID: schema.ID, + TableName: tableName, + }) + if err != nil { + return errors.WithMessagef(err, "storage delete table, clusterID:%d, schema:%s, tableName:%s", + m.clusterID, schemaName, tableName) + } + + tables := m.schemaTables[schema.ID] + delete(tables.tables, tableName) + delete(tables.tablesByID, table.ID) + return nil +} + +func (m *TableManagerImpl) GetSchemaByName(schemaName string) (storage.Schema, bool) { + schema, ok := m.schemas[schemaName] + return schema, ok +} + +func (m *TableManagerImpl) GetSchemas() []storage.Schema { + m.lock.RLock() + defer m.lock.RUnlock() + + schemas := make([]storage.Schema, len(m.schemas)) + + for _, schema := range m.schemas { + schemas = append(schemas, schema) + } + + return schemas +} + +func (m *TableManagerImpl) GetOrCreateSchema(ctx context.Context, schemaName string) (storage.Schema, bool, error) { + m.lock.Lock() + defer m.lock.Unlock() + + schema, ok := m.schemas[schemaName] + if ok { + return schema, true, nil + } + + id, err := m.schemaIDAlloc.Alloc(ctx) + if err != nil { + return storage.Schema{}, false, errors.WithMessage(err, "alloc schema id") + } + + schema = storage.Schema{ + ID: storage.SchemaID(id), + ClusterID: m.clusterID, + Name: schemaName, + CreatedAt: uint64(time.Now().UnixMilli()), + } + + // Create schema in storage. + if err = m.storage.CreateSchema(ctx, storage.CreateSchemaRequest{ + ClusterID: m.clusterID, + Schema: schema, + }); err != nil { + return storage.Schema{}, false, errors.WithMessage(err, "create schema in storage") + } + // Update schema in memory. + m.schemas[schemaName] = schema + return schema, false, nil +} + +func (m *TableManagerImpl) loadSchema(ctx context.Context) error { + schemasResult, err := m.storage.ListSchemas(ctx, storage.ListSchemasRequest{ClusterID: m.clusterID}) + if err != nil { + return errors.WithMessage(err, "list schemas") + } + + m.schemas = make(map[string]storage.Schema, len(schemasResult.Schemas)) + for _, schema := range schemasResult.Schemas { + m.schemas[schema.Name] = schema + } + + return nil +} + +func (m *TableManagerImpl) loadTable(ctx context.Context) error { + m.schemaTables = make(map[storage.SchemaID]*Tables, len(m.schemas)) + for _, schema := range m.schemas { + tablesResult, err := m.storage.ListTables(ctx, storage.ListTableRequest{ + ClusterID: m.clusterID, + SchemaID: schema.ID, + }) + if err != nil { + return errors.WithMessage(err, "list Tables") + } + for _, table := range tablesResult.Tables { + tables, ok := m.schemaTables[table.SchemaID] + if !ok { + tables.tables = make(map[string]storage.Table, 0) + tables.tablesByID = make(map[storage.TableID]storage.Table, 0) + } + + tables.tables[table.Name] = table + tables.tablesByID[table.ID] = table + } + } + return nil +} + +func (m *TableManagerImpl) getTable(schemaName, tableName string) (storage.Table, bool, error) { + schema, ok := m.schemas[schemaName] + if !ok { + return storage.Table{}, false, ErrSchemaNotFound.WithCausef("schema name", schemaName) + } + tables, ok := m.schemaTables[schema.ID] + if !ok { + return storage.Table{}, false, nil + } + + table, ok := tables.tables[tableName] + return table, ok, nil +} diff --git a/server/cluster/manager_test.go b/server/cluster/table_manager_test.go similarity index 76% rename from server/cluster/manager_test.go rename to server/cluster/table_manager_test.go index 375dc56c..32b32ac3 100644 --- a/server/cluster/manager_test.go +++ b/server/cluster/table_manager_test.go @@ -10,7 +10,6 @@ import ( "time" "github.com/CeresDB/ceresdbproto/pkg/clusterpb" - "github.com/CeresDB/ceresdbproto/pkg/metaservicepb" "github.com/CeresDB/ceresmeta/server/etcdutil" "github.com/CeresDB/ceresmeta/server/storage" "github.com/stretchr/testify/require" @@ -85,27 +84,27 @@ func TestManagerSingleThread(t *testing.T) { testCreateCluster(ctx, re, manager, cluster1) - testRegisterNode(ctx, re, manager, cluster1, node1, defaultLease) - testRegisterNode(ctx, re, manager, cluster1, node2, defaultLease) + testRegisterNode(ctx, re, manager, cluster1, node1) + testRegisterNode(ctx, re, manager, cluster1, node2) - testGetTables(ctx, re, manager, node1, cluster1, 0) + testGetTables(re, manager, node1, cluster1, 0) testAllocSchemaID(ctx, re, manager, cluster1, defaultSchema, defaultSchemaID) testAllocSchemaID(ctx, re, manager, cluster1, defaultSchema, defaultSchemaID) - testAllocTableID(ctx, re, manager, node1, cluster1, defaultSchema, table1, tableID1) - testAllocTableID(ctx, re, manager, node1, cluster1, defaultSchema, table1, tableID1) - testAllocTableID(ctx, re, manager, node1, cluster1, defaultSchema, table2, tableID2) - testAllocTableID(ctx, re, manager, node2, cluster1, defaultSchema, table3, tableID3) - testAllocTableID(ctx, re, manager, node2, cluster1, defaultSchema, table4, tableID4) + testCreateTable(ctx, re, manager, node1, cluster1, defaultSchema, table1, tableID1) + testCreateTable(ctx, re, manager, node1, cluster1, defaultSchema, table1, tableID1) + testCreateTable(ctx, re, manager, node1, cluster1, defaultSchema, table2, tableID2) + testCreateTable(ctx, re, manager, node2, cluster1, defaultSchema, table3, tableID3) + testCreateTable(ctx, re, manager, node2, cluster1, defaultSchema, table4, tableID4) testRouteTables(ctx, re, manager, cluster1, defaultSchema, []string{table1, table2, table3, table4}) testDropTable(ctx, re, manager, cluster1, defaultSchema, table1) testDropTable(ctx, re, manager, cluster1, defaultSchema, table3) - testGetTables(ctx, re, manager, node1, cluster1, 1) - testGetTables(ctx, re, manager, node2, cluster1, 1) + testGetTables(re, manager, node1, cluster1, 1) + testGetTables(re, manager, node2, cluster1, 1) testGetNodes(ctx, re, manager, cluster1) re.NoError(manager.Stop(ctx)) @@ -115,8 +114,8 @@ func TestManagerSingleThread(t *testing.T) { re.NoError(manager.Start(ctx)) - testGetTables(ctx, re, manager, node1, cluster1, 1) - testGetTables(ctx, re, manager, node2, cluster1, 1) + testGetTables(re, manager, node1, cluster1, 1) + testGetTables(re, manager, node2, cluster1, 1) re.NoError(manager.Stop(ctx)) } @@ -146,8 +145,8 @@ func TestManagerMultiThread(t *testing.T) { func testCluster(ctx context.Context, re *require.Assertions, manager Manager, clusterName string) { testCreateCluster(ctx, re, manager, clusterName) - testRegisterNode(ctx, re, manager, clusterName, node1, defaultLease) - testRegisterNode(ctx, re, manager, clusterName, node2, defaultLease) + testRegisterNode(ctx, re, manager, clusterName, node1) + testRegisterNode(ctx, re, manager, clusterName, node2) testAllocSchemaIDMultiThread(ctx, re, manager, clusterName, defaultSchema, defaultSchemaID) @@ -164,12 +163,13 @@ func testCreateCluster(ctx context.Context, re *require.Assertions, manager Mana } func testRegisterNode(ctx context.Context, re *require.Assertions, manager Manager, - cluster, node string, lease uint32, + clusterName, nodeName string, ) { - err := manager.RegisterNode(ctx, cluster, &metaservicepb.NodeInfo{ - Endpoint: node, - Lease: lease, - }) + err := manager.RegisterNode(ctx, clusterName, storage.Node{ + Name: nodeName, + LastTouchTime: uint64(time.Now().UnixMilli()), + State: storage.Online, + }, []ShardInfo{}) re.NoError(err) } @@ -181,19 +181,22 @@ func testAllocSchemaID(ctx context.Context, re *require.Assertions, manager Mana re.Equal(schemaID, id) } -func testAllocTableID(ctx context.Context, re *require.Assertions, manager Manager, - node, cluster, schema, tableName string, tableID uint64, +func testCreateTable(ctx context.Context, re *require.Assertions, manager Manager, + node, clusterName, schema, tableName string, tableID uint64, ) { - table, _, err := manager.AllocTableID(ctx, cluster, schema, tableName, node) + c, err := manager.GetCluster(ctx, clusterName) re.NoError(err) - re.Equal(tableID, table.GetID()) -} - -func testGetTables(ctx context.Context, re *require.Assertions, manager Manager, node, cluster string, num int) { - shardIDs, err := manager.GetShardIDs(ctx, cluster, node) + table, err := c.CreateTable(ctx, node, schema, tableName) re.NoError(err) + re.Equal(tableID, table.Table.ID) +} - shardTables, err := manager.GetTables(ctx, cluster, node, shardIDs) +func testGetTables(re *require.Assertions, manager Manager, node, cluster string, num int) { + shardIDs := make([]storage.ShardID, 0, defaultShardTotal) + for i := 0; i < defaultShardTotal; i++ { + shardIDs = append(shardIDs, storage.ShardID(i)) + } + shardTables, err := manager.GetTables(cluster, node, shardIDs) re.NoError(err) re.Equal(4, len(shardTables)) @@ -208,11 +211,11 @@ func testGetTables(ctx context.Context, re *require.Assertions, manager Manager, func testRouteTables(ctx context.Context, re *require.Assertions, manager Manager, cluster, schema string, tableNames []string) { ret, err := manager.RouteTables(ctx, cluster, schema, tableNames) re.NoError(err) - re.Equal(uint64(0), ret.Version) + re.Equal(uint64(0), ret.ClusterViewVersion) re.Equal(len(tableNames), len(ret.RouteEntries)) for _, entry := range ret.RouteEntries { re.Equal(1, len(entry.NodeShards)) - re.Equal(clusterpb.ShardRole_LEADER, entry.NodeShards[0].ShardInfo.Role) + re.Equal(storage.Leader, entry.NodeShards[0].ShardNode.ShardRole) } } @@ -240,11 +243,11 @@ func testAllocTableIDMultiThread(ctx context.Context, re *require.Assertions, ma wg.Add(1) go func() { defer wg.Done() - testAllocTableID(ctx, re, manager, node1, clusterName, defaultSchema, table1, tableID) + testCreateTable(ctx, re, manager, node1, clusterName, defaultSchema, table1, tableID) }() } - testAllocTableID(ctx, re, manager, node2, clusterName, defaultSchema, table1, tableID) + testCreateTable(ctx, re, manager, node2, clusterName, defaultSchema, table1, tableID) wg.Wait() } diff --git a/server/cluster/topology_manager.go b/server/cluster/topology_manager.go new file mode 100644 index 00000000..34da8881 --- /dev/null +++ b/server/cluster/topology_manager.go @@ -0,0 +1,427 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +package cluster + +import ( + "context" + "math/rand" + "sync" + "time" + + "github.com/CeresDB/ceresmeta/server/id" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/pkg/errors" +) + +// TopologyManager manages the cluster topology, including the mapping relationship between shards, nodes, and Tables. +type TopologyManager interface { + Load(context.Context) error + // GetVersion get cluster view versions. + GetVersion() uint64 + GetClusterState() storage.ClusterState + GetTableIDs(shardIDs []storage.ShardID, nodeName string) map[storage.ShardID]ShardTableIDs + AddTable(ctx context.Context, nodeName string, table storage.Table) (ShardVersionUpdate, error) + RemoveTable(context.Context, storage.TableID) (ShardVersionUpdate, error) + GetShardNodesByID(storage.ShardID) ([]storage.ShardNode, error) + GetShardNodesByTableIDs(tableID []storage.TableID) (GetShardNodesByTableIDsResult, error) + GetShardNodes() GetShardNodesResult + InitClusterView(context.Context) error + UpdateClusterView(context.Context, storage.ClusterState, []storage.ShardNode) error + CreateShardViews(context.Context, []CreateShardView) error +} + +type ShardTableIDs struct { + ShardNode storage.ShardNode + TableIDs []storage.TableID + Version uint64 +} + +type GetShardTablesByNodeResult struct { + ShardTableIDs map[storage.ShardID]ShardTableIDs +} + +type GetShardNodesByTableIDsResult struct { + ShardNodes map[storage.TableID][]storage.ShardNode + Version map[storage.ShardID]uint64 +} + +type GetShardNodesResult struct { + shardNodes []storage.ShardNode + versions map[storage.ShardID]uint64 +} + +type CreateShardView struct { + ShardID storage.ShardID + Tables []storage.TableID +} + +// nolint +type TopologyManagerImpl struct { + storage storage.Storage + clusterID storage.ClusterID + shardIDAlloc id.Allocator + + // RWMutex is used to protect following fields. + lock sync.RWMutex + // ClusterView in memory. + clusterView storage.ClusterView + shardNodesMapping map[storage.ShardID][]storage.ShardNode // ShardID -> nodes of the shard + nodeShardsMapping map[string][]storage.ShardNode // nodeName -> shards of the node + // ShardView in memory. + shardTablesMapping map[storage.ShardID]*storage.ShardView // ShardID -> shardTopology + tableShardMapping map[storage.TableID]storage.ShardID // tableID -> ShardID + + // Node in memory. + nodes map[string]storage.Node +} + +func NewTopologyManagerImpl(storage storage.Storage, clusterID storage.ClusterID, shardIDAlloc id.Allocator) TopologyManager { + return &TopologyManagerImpl{ + storage: storage, + clusterID: clusterID, + shardIDAlloc: shardIDAlloc, + } +} + +func (m *TopologyManagerImpl) Load(ctx context.Context) error { + m.lock.Lock() + defer m.lock.Unlock() + + if err := m.loadClusterView(ctx); err != nil { + return errors.WithMessage(err, "load cluster view") + } + + if err := m.loadShardView(ctx); err != nil { + return errors.WithMessage(err, "load shard view") + } + + if err := m.loadNode(ctx); err != nil { + return errors.WithMessage(err, "load node") + } + return nil +} + +func (m *TopologyManagerImpl) GetVersion() uint64 { + m.lock.RLock() + defer m.lock.RUnlock() + + return m.clusterView.Version +} + +func (m *TopologyManagerImpl) GetClusterState() storage.ClusterState { + m.lock.RLock() + defer m.lock.RUnlock() + + return m.clusterView.State +} + +func (m *TopologyManagerImpl) GetTableIDs(shardIDs []storage.ShardID, nodeName string) map[storage.ShardID]ShardTableIDs { + m.lock.RLock() + defer m.lock.RUnlock() + + shardTableIDs := make(map[storage.ShardID]ShardTableIDs, len(shardIDs)) + for _, shardID := range shardIDs { + for _, shardNode := range m.shardNodesMapping[shardID] { + if shardNode.Node == nodeName { + shardView := m.shardTablesMapping[shardID] + + shardTableIDs[shardID] = ShardTableIDs{ + ShardNode: shardNode, + TableIDs: shardView.TableIDs, + Version: shardView.Version, + } + break + } + } + } + + return shardTableIDs +} + +func (m *TopologyManagerImpl) AddTable(ctx context.Context, nodeName string, table storage.Table) (ShardVersionUpdate, error) { + m.lock.Lock() + defer m.lock.Unlock() + + // Pick up one shard to contain the table. + shardNodes := m.nodeShardsMapping[nodeName] + var shardIDs []storage.ShardID + for _, shardNode := range shardNodes { + if shardNode.Node == nodeName && shardNode.ShardRole == storage.Leader { + shardIDs = append(shardIDs, shardNode.ID) + } + } + idx := rand.Int31n(int32(len((shardIDs)))) // #nosec G404 + shardID := shardIDs[idx] + + shardView := m.shardTablesMapping[shardID] + prevVersion := shardView.Version + + tableIDs := make([]storage.TableID, 0, len(shardView.TableIDs)) + copy(tableIDs, shardView.TableIDs) + tableIDs = append(tableIDs, table.ID) + newShardView := storage.ShardView{ + ShardID: shardID, + Version: prevVersion + 1, + TableIDs: tableIDs, + CreatedAt: uint64(time.Now().UnixMilli()), + } + + // Update shard view in storage. + err := m.storage.UpdateShardView(ctx, storage.PutShardViewRequest{ + ClusterID: m.clusterID, + ShardView: newShardView, + LatestVersion: prevVersion, + }) + if err != nil { + return ShardVersionUpdate{}, errors.WithMessage(err, "storage update shard view") + } + + // Update shard view in memory. + m.shardTablesMapping[shardID] = &newShardView + m.tableShardMapping[table.ID] = shardID + + return ShardVersionUpdate{ + ShardID: shardID, + CurrVersion: prevVersion + 1, + PrevVersion: prevVersion, + }, nil +} + +func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, tableID storage.TableID) (ShardVersionUpdate, error) { + m.lock.Lock() + defer m.lock.Unlock() + + shardID, ok := m.tableShardMapping[tableID] + if !ok { + return ShardVersionUpdate{}, ErrTableNotFound.WithCausef("table id:%d", tableID) + } + + shardView, ok := m.shardTablesMapping[shardID] + if !ok { + return ShardVersionUpdate{}, ErrShardNotFound.WithCausef("shard id:%d", shardID) + } + prevVersion := shardView.Version + + tableIDs := make([]storage.TableID, 0, len(shardView.TableIDs)) + for _, id := range shardView.TableIDs { + if id != tableID { + tableIDs = append(tableIDs, id) + } + } + + // Update shardView in storage. + if err := m.storage.UpdateShardView(ctx, storage.PutShardViewRequest{ + ClusterID: m.clusterID, + ShardView: storage.ShardView{ + ShardID: shardView.ShardID, + Version: prevVersion + 1, + TableIDs: tableIDs, + CreatedAt: uint64(time.Now().UnixMilli()), + }, + LatestVersion: prevVersion, + }); err != nil { + return ShardVersionUpdate{}, errors.WithMessage(err, "update shard view in storage") + } + + // Update shardView in memory. + shardView.Version = prevVersion + 1 + shardView.TableIDs = tableIDs + + return ShardVersionUpdate{ + ShardID: shardID, + CurrVersion: prevVersion + 1, + PrevVersion: prevVersion, + }, nil +} + +func (m *TopologyManagerImpl) GetShardNodesByID(shardID storage.ShardID) ([]storage.ShardNode, error) { + m.lock.RLock() + defer m.lock.RUnlock() + + shardNodes, ok := m.shardNodesMapping[shardID] + if !ok { + return nil, ErrShardNotFound.WithCausef("shard id:%d", shardID) + } + + return shardNodes, nil +} + +func (m *TopologyManagerImpl) GetShardNodesByTableIDs(tableIDs []storage.TableID) (GetShardNodesByTableIDsResult, error) { + m.lock.RLock() + defer m.lock.RUnlock() + + tableShardNodes := make(map[storage.TableID][]storage.ShardNode, len(tableIDs)) + shardViewVersions := make(map[storage.ShardID]uint64, 0) + for _, tableID := range tableIDs { + shardID, ok := m.tableShardMapping[tableID] + if !ok { + return GetShardNodesByTableIDsResult{}, ErrTableNotFound.WithCausef("table id:%d", tableID) + } + + shardNodes, ok := m.shardNodesMapping[shardID] + if !ok { + return GetShardNodesByTableIDsResult{}, ErrShardNotFound.WithCausef("shard id:%d", shardID) + } + tableShardNodes[tableID] = shardNodes + + _, ok = shardViewVersions[shardID] + if !ok { + shardViewVersions[shardID] = m.shardTablesMapping[shardID].Version + } + } + + return GetShardNodesByTableIDsResult{ + ShardNodes: tableShardNodes, + Version: shardViewVersions, + }, nil +} + +func (m *TopologyManagerImpl) GetShardNodes() GetShardNodesResult { + m.lock.RLock() + defer m.lock.RUnlock() + + shardNodes := make([]storage.ShardNode, 0, len(m.shardNodesMapping)) + shardViewVersions := make(map[storage.ShardID]uint64, len(m.shardTablesMapping)) + for _, shardNode := range m.shardNodesMapping { + shardNodes = append(shardNodes, shardNode...) + } + for shardID, shardView := range m.shardTablesMapping { + shardViewVersions[shardID] = shardView.Version + } + + return GetShardNodesResult{ + shardNodes: shardNodes, + versions: shardViewVersions, + } +} + +func (m *TopologyManagerImpl) InitClusterView(ctx context.Context) error { + clusterView := storage.ClusterView{ + ClusterID: m.clusterID, + Version: 0, + State: storage.Empty, + ShardNodes: nil, + CreatedAt: uint64(time.Now().UnixMilli()), + } + + err := m.storage.CreateClusterView(ctx, storage.CreateClusterViewRequest{ClusterView: clusterView}) + if err != nil { + return errors.WithMessage(err, "create cluster view") + } + return nil +} + +func (m *TopologyManagerImpl) UpdateClusterView(ctx context.Context, state storage.ClusterState, shardNodes []storage.ShardNode) error { + m.lock.Lock() + defer m.lock.Unlock() + + // Update cluster view in storage. + newClusterView := storage.ClusterView{ + ClusterID: m.clusterID, + Version: m.clusterView.Version + 1, + State: state, + ShardNodes: shardNodes, + CreatedAt: uint64(time.Now().UnixMilli()), + } + if err := m.storage.UpdateClusterView(ctx, storage.UpdateClusterViewRequest{ + ClusterID: m.clusterID, + ClusterView: newClusterView, + LatestVersion: m.clusterView.Version, + }); err != nil { + return errors.WithMessage(err, "update cluster view") + } + + // Load cluster view into memory. + if err := m.loadClusterView(ctx); err != nil { + return errors.WithMessage(err, "load cluster view") + } + return nil +} + +func (m *TopologyManagerImpl) CreateShardViews(ctx context.Context, createShardViews []CreateShardView) error { + m.lock.Lock() + defer m.lock.Unlock() + + // Create shard view in storage. + shardViews := make([]storage.ShardView, 0, len(createShardViews)) + for _, createShardView := range createShardViews { + shardViews = append(shardViews, storage.ShardView{ + ShardID: createShardView.ShardID, + Version: 0, + TableIDs: createShardView.Tables, + CreatedAt: uint64(time.Now().UnixMilli()), + }) + } + if err := m.storage.CreateShardViews(ctx, storage.CreateShardViewsRequest{ + ClusterID: m.clusterID, + ShardViews: shardViews, + }); err != nil { + return errors.WithMessage(err, "create shard view") + } + + // Load shard view into memory. + if err := m.loadShardView(ctx); err != nil { + return errors.WithMessage(err, "load shard view") + } + return nil +} + +func (m *TopologyManagerImpl) loadClusterView(ctx context.Context) error { + clusterViewResult, err := m.storage.GetClusterView(ctx, storage.GetClusterViewRequest{ + ClusterID: m.clusterID, + }) + if err != nil { + return errors.WithMessage(err, "get cluster view") + } + + m.shardNodesMapping = make(map[storage.ShardID][]storage.ShardNode, len(clusterViewResult.ClusterView.ShardNodes)) + m.nodeShardsMapping = make(map[string][]storage.ShardNode, len(clusterViewResult.ClusterView.ShardNodes)) + for _, shardNode := range clusterViewResult.ClusterView.ShardNodes { + m.shardNodesMapping[shardNode.ID] = append(m.shardNodesMapping[shardNode.ID], shardNode) + m.nodeShardsMapping[shardNode.Node] = append(m.nodeShardsMapping[shardNode.Node], shardNode) + } + m.clusterView = clusterViewResult.ClusterView + + return nil +} + +func (m *TopologyManagerImpl) loadShardView(ctx context.Context) error { + shardIDs := make([]storage.ShardID, 0, len(m.shardNodesMapping)) + for id := range m.shardNodesMapping { + shardIDs = append(shardIDs, id) + } + + shardViewsResult, err := m.storage.ListShardViews(ctx, storage.ListShardViewsRequest{ + ClusterID: m.clusterID, + ShardIDs: shardIDs, + }) + if err != nil { + return errors.WithMessage(err, "list shard views") + } + + m.shardTablesMapping = make(map[storage.ShardID]*storage.ShardView, len(shardViewsResult.ShardViews)) + m.tableShardMapping = make(map[storage.TableID]storage.ShardID, 0) + for _, shardView := range shardViewsResult.ShardViews { + view := shardView + m.shardTablesMapping[shardView.ShardID] = &view + for _, tableID := range shardView.TableIDs { + m.tableShardMapping[tableID] = shardView.ShardID + } + } + + return nil +} + +func (m *TopologyManagerImpl) loadNode(ctx context.Context) error { + nodesResult, err := m.storage.ListNodes(ctx, storage.ListNodesRequest{ClusterID: m.clusterID}) + if err != nil { + return errors.WithMessage(err, "list nodes") + } + + m.nodes = make(map[string]storage.Node, len(nodesResult.Nodes)) + for _, node := range nodesResult.Nodes { + m.nodes[node.Name] = node + } + + return nil +} diff --git a/server/cluster/types.go b/server/cluster/types.go index 08a58033..849f472a 100644 --- a/server/cluster/types.go +++ b/server/cluster/types.go @@ -5,23 +5,33 @@ package cluster import ( "github.com/CeresDB/ceresdbproto/pkg/clusterpb" "github.com/CeresDB/ceresdbproto/pkg/metaservicepb" + "github.com/CeresDB/ceresmeta/server/storage" ) +const ( + MinShardID = 0 +) + +type TableWitchSchema struct { + Table storage.Table + Schema storage.Schema +} + type TableInfo struct { - ID uint64 + ID storage.TableID Name string - SchemaID uint32 + SchemaID storage.SchemaID SchemaName string } type ShardTables struct { - Shard *ShardInfo - Tables []*TableInfo + Shard ShardInfo + Tables []TableInfo } type ShardInfo struct { - ID uint32 - Role clusterpb.ShardRole + ID storage.ShardID + Role storage.ShardRole Version uint64 } @@ -30,54 +40,62 @@ type ShardsOfNode struct { ShardIDs []uint32 } -type NodeShard struct { - Endpoint string - ShardInfo *ShardInfo +type ShardNodeWithVersion struct { + version uint64 + ShardNode storage.ShardNode } type CreateTableResult struct { - Table *Table - ShardVersionUpdate *ShardVersionUpdate + Table storage.Table + ShardVersionUpdate ShardVersionUpdate } type DropTableResult struct { - ShardVersionUpdate *ShardVersionUpdate + ShardVersionUpdate ShardVersionUpdate } type ShardVersionUpdate struct { - ShardID uint32 + ShardID storage.ShardID CurrVersion uint64 PrevVersion uint64 } type RouteEntry struct { - Table *TableInfo - NodeShards []*NodeShard + Table TableInfo + NodeShards []ShardNodeWithVersion } type RouteTablesResult struct { - Version uint64 - RouteEntries map[string]*RouteEntry + ClusterViewVersion uint64 + RouteEntries map[string]RouteEntry } type GetNodeShardsResult struct { ClusterTopologyVersion uint64 - NodeShards []*NodeShard + NodeShards []ShardNodeWithVersion } -func ConvertShardsInfoToPB(shard *ShardInfo) *metaservicepb.ShardInfo { +func ConvertShardsInfoToPB(shard ShardInfo) *metaservicepb.ShardInfo { return &metaservicepb.ShardInfo{ - Id: shard.ID, - Role: shard.Role, + Id: uint32(shard.ID), + Role: clusterpb.ShardRole(shard.Role), + Version: shard.Version, + } +} + +func ConvertShardsInfoPB(shard *metaservicepb.ShardInfo) ShardInfo { + return ShardInfo{ + ID: storage.ShardID(shard.Id), + Role: storage.ConvertShardRolePB(shard.Role), Version: shard.Version, } } -func ConvertTableInfoToPB(table *TableInfo) *metaservicepb.TableInfo { +func ConvertTableInfoToPB(table TableInfo) *metaservicepb.TableInfo { return &metaservicepb.TableInfo{ - Id: table.ID, + Id: uint64(table.ID), Name: table.Name, - SchemaId: table.SchemaID, + SchemaId: uint32(table.SchemaID), SchemaName: table.SchemaName, } } diff --git a/server/coordinator/eventdispatch/dispatch.go b/server/coordinator/eventdispatch/dispatch.go index 28fbb136..30afded2 100644 --- a/server/coordinator/eventdispatch/dispatch.go +++ b/server/coordinator/eventdispatch/dispatch.go @@ -9,14 +9,14 @@ import ( ) type Dispatch interface { - OpenShard(context context.Context, address string, request *OpenShardRequest) error - CloseShard(context context.Context, address string, request *CloseShardRequest) error - CreateTableOnShard(context context.Context, address string, request *CreateTableOnShardRequest) error - DropTableOnShard(context context.Context, address string, request *DropTableOnShardRequest) error + OpenShard(context context.Context, address string, request OpenShardRequest) error + CloseShard(context context.Context, address string, request CloseShardRequest) error + CreateTableOnShard(context context.Context, address string, request CreateTableOnShardRequest) error + DropTableOnShard(context context.Context, address string, request DropTableOnShardRequest) error } type OpenShardRequest struct { - Shard *cluster.ShardInfo + Shard cluster.ShardInfo } type CloseShardRequest struct { @@ -24,13 +24,13 @@ type CloseShardRequest struct { } type UpdateShardInfo struct { - CurrShardInfo *cluster.ShardInfo + CurrShardInfo cluster.ShardInfo PrevVersion uint64 } type CreateTableOnShardRequest struct { - UpdateShardInfo *UpdateShardInfo - TableInfo *cluster.TableInfo + UpdateShardInfo UpdateShardInfo + TableInfo cluster.TableInfo EncodedSchema []byte Engine string CreateIfNotExist bool @@ -38,6 +38,6 @@ type CreateTableOnShardRequest struct { } type DropTableOnShardRequest struct { - UpdateShardInfo *UpdateShardInfo - TableInfo *cluster.TableInfo + UpdateShardInfo UpdateShardInfo + TableInfo cluster.TableInfo } diff --git a/server/coordinator/eventdispatch/dispatch_impl.go b/server/coordinator/eventdispatch/dispatch_impl.go index 8c1e86b2..bfd3ea3f 100644 --- a/server/coordinator/eventdispatch/dispatch_impl.go +++ b/server/coordinator/eventdispatch/dispatch_impl.go @@ -24,7 +24,7 @@ func NewDispatchImpl() *DispatchImpl { return &DispatchImpl{} } -func (d *DispatchImpl) OpenShard(ctx context.Context, addr string, request *OpenShardRequest) error { +func (d *DispatchImpl) OpenShard(ctx context.Context, addr string, request OpenShardRequest) error { client, err := d.getMetaEventClient(ctx, addr) if err != nil { return err @@ -41,7 +41,7 @@ func (d *DispatchImpl) OpenShard(ctx context.Context, addr string, request *Open return nil } -func (d *DispatchImpl) CloseShard(ctx context.Context, addr string, request *CloseShardRequest) error { +func (d *DispatchImpl) CloseShard(ctx context.Context, addr string, request CloseShardRequest) error { client, err := d.getMetaEventClient(ctx, addr) if err != nil { return err @@ -58,7 +58,7 @@ func (d *DispatchImpl) CloseShard(ctx context.Context, addr string, request *Clo return nil } -func (d *DispatchImpl) CreateTableOnShard(ctx context.Context, addr string, request *CreateTableOnShardRequest) error { +func (d *DispatchImpl) CreateTableOnShard(ctx context.Context, addr string, request CreateTableOnShardRequest) error { client, err := d.getMetaEventClient(ctx, addr) if err != nil { return err @@ -73,7 +73,7 @@ func (d *DispatchImpl) CreateTableOnShard(ctx context.Context, addr string, requ return nil } -func (d *DispatchImpl) DropTableOnShard(ctx context.Context, addr string, request *DropTableOnShardRequest) error { +func (d *DispatchImpl) DropTableOnShard(ctx context.Context, addr string, request DropTableOnShardRequest) error { client, err := d.getMetaEventClient(ctx, addr) if err != nil { return err @@ -109,7 +109,7 @@ func (d *DispatchImpl) getMetaEventClient(ctx context.Context, addr string) (met return metaeventpb.NewMetaEventServiceClient(client), nil } -func convertCreateTableOnShardRequestToPB(request *CreateTableOnShardRequest) *metaeventpb.CreateTableOnShardRequest { +func convertCreateTableOnShardRequestToPB(request CreateTableOnShardRequest) *metaeventpb.CreateTableOnShardRequest { return &metaeventpb.CreateTableOnShardRequest{ UpdateShardInfo: convertUpdateShardInfoToPB(request.UpdateShardInfo), TableInfo: cluster.ConvertTableInfoToPB(request.TableInfo), @@ -120,14 +120,14 @@ func convertCreateTableOnShardRequestToPB(request *CreateTableOnShardRequest) *m } } -func convertDropTableOnShardRequestToPB(request *DropTableOnShardRequest) *metaeventpb.DropTableOnShardRequest { +func convertDropTableOnShardRequestToPB(request DropTableOnShardRequest) *metaeventpb.DropTableOnShardRequest { return &metaeventpb.DropTableOnShardRequest{ UpdateShardInfo: convertUpdateShardInfoToPB(request.UpdateShardInfo), TableInfo: cluster.ConvertTableInfoToPB(request.TableInfo), } } -func convertUpdateShardInfoToPB(updateShardInfo *UpdateShardInfo) *metaeventpb.UpdateShardInfo { +func convertUpdateShardInfoToPB(updateShardInfo UpdateShardInfo) *metaeventpb.UpdateShardInfo { return &metaeventpb.UpdateShardInfo{ CurrShardInfo: cluster.ConvertShardsInfoToPB(updateShardInfo.CurrShardInfo), PrevVersion: updateShardInfo.PrevVersion, diff --git a/server/coordinator/procedure/common_test.go b/server/coordinator/procedure/common_test.go index 23121daa..ec41e223 100644 --- a/server/coordinator/procedure/common_test.go +++ b/server/coordinator/procedure/common_test.go @@ -30,19 +30,19 @@ const ( type MockDispatch struct{} -func (m MockDispatch) OpenShard(_ context.Context, _ string, _ *eventdispatch.OpenShardRequest) error { +func (m MockDispatch) OpenShard(_ context.Context, _ string, _ eventdispatch.OpenShardRequest) error { return nil } -func (m MockDispatch) CloseShard(_ context.Context, _ string, _ *eventdispatch.CloseShardRequest) error { +func (m MockDispatch) CloseShard(_ context.Context, _ string, _ eventdispatch.CloseShardRequest) error { return nil } -func (m MockDispatch) CreateTableOnShard(_ context.Context, _ string, _ *eventdispatch.CreateTableOnShardRequest) error { +func (m MockDispatch) CreateTableOnShard(_ context.Context, _ string, _ eventdispatch.CreateTableOnShardRequest) error { return nil } -func (m MockDispatch) DropTableOnShard(_ context.Context, _ string, _ *eventdispatch.DropTableOnShardRequest) error { +func (m MockDispatch) DropTableOnShard(_ context.Context, _ string, _ eventdispatch.DropTableOnShardRequest) error { return nil } diff --git a/server/coordinator/procedure/create_drop_table_test.go b/server/coordinator/procedure/create_drop_table_test.go index 55f3bbba..d974c323 100644 --- a/server/coordinator/procedure/create_drop_table_test.go +++ b/server/coordinator/procedure/create_drop_table_test.go @@ -24,14 +24,14 @@ func TestCreateAndDropTable(t *testing.T) { }, SchemaName: testSchemaName, Name: testTableName, - }, func(_ *cluster.CreateTableResult) error { + }, func(_ cluster.CreateTableResult) error { return nil }, func(_ error) error { return nil }) err := procedure.Start(ctx) re.NoError(err) - table, b, err := c.GetTable(ctx, testSchemaName, testTableName) + table, b, err := c.GetTable(testSchemaName, testTableName) re.NoError(err) re.Equal(b, true) re.NotNil(table) @@ -42,17 +42,16 @@ func TestCreateAndDropTable(t *testing.T) { Node: nodeName0, ClusterName: clusterName, }, - SchemaName: table.GetSchemaName(), - Name: table.GetName(), - }, func(_ *cluster.TableInfo) error { + SchemaName: testSchemaName, + Name: table.Name, + }, func(_ cluster.TableInfo) error { return nil }, func(_ error) error { return nil }) err = procedure.Start(ctx) re.NoError(err) - table, b, err = c.GetTable(ctx, testSchemaName, testTableName) + table, b, err = c.GetTable(testSchemaName, testTableName) re.NoError(err) re.Equal(b, false) - re.Nil(table) } diff --git a/server/coordinator/procedure/create_table.go b/server/coordinator/procedure/create_table.go index 32e59235..f438cf59 100644 --- a/server/coordinator/procedure/create_table.go +++ b/server/coordinator/procedure/create_table.go @@ -6,7 +6,8 @@ import ( "context" "sync" - "github.com/CeresDB/ceresdbproto/pkg/clusterpb" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/CeresDB/ceresdbproto/pkg/metaservicepb" "github.com/CeresDB/ceresmeta/pkg/log" "github.com/CeresDB/ceresmeta/server/cluster" @@ -42,55 +43,58 @@ var ( func createTablePrepareCallback(event *fsm.Event) { req := event.Args[0].(*createTableCallbackRequest) - table, exists, err := req.cluster.GetTable(req.ctx, req.sourceReq.GetSchemaName(), req.sourceReq.GetName()) + _, exists, err := req.cluster.GetTable(req.sourceReq.GetSchemaName(), req.sourceReq.GetName()) if err != nil { cancelEventWithLog(event, err, "cluster get table") return } if exists { - req.createTableResult = &cluster.CreateTableResult{ - Table: table, - ShardVersionUpdate: &cluster.ShardVersionUpdate{ - ShardID: table.GetShardID(), - }, - } log.Warn("create an existing table", zap.String("schema", req.sourceReq.GetSchemaName()), zap.String("table", req.sourceReq.GetName())) + cancelEventWithLog(event, ErrTableAlreadyExists, "cluster get table") return } createTableResult, err := req.cluster.CreateTable(req.ctx, req.sourceReq.GetHeader().GetNode(), req.sourceReq.GetSchemaName(), req.sourceReq.GetName()) if err != nil { - cancelEventWithLog(event, err, "cluster get table") + cancelEventWithLog(event, err, "cluster create table") return } - shard, err := req.cluster.GetShardByID(createTableResult.Table.GetShardID()) + shardNodes, err := req.cluster.GetShardNodesByShardID(createTableResult.ShardVersionUpdate.ShardID) if err != nil { - cancelEventWithLog(event, err, "cluster get shard by id") + cancelEventWithLog(event, err, "cluster get shardNode by id") return } // TODO: consider followers - leader := shard.GetLeader() - if leader == nil { - cancelEventWithLog(event, ErrShardLeaderNotFound, "shard can't find leader") + leader := storage.ShardNode{} + found := false + for _, shardNode := range shardNodes { + if shardNode.ShardRole == storage.Leader { + found = true + leader = shardNode + break + } + } + if !found { + cancelEventWithLog(event, ErrShardLeaderNotFound, "shard node can't find leader") return } - err = req.dispatch.CreateTableOnShard(req.ctx, leader.Node, &eventdispatch.CreateTableOnShardRequest{ - UpdateShardInfo: &eventdispatch.UpdateShardInfo{ - CurrShardInfo: &cluster.ShardInfo{ + err = req.dispatch.CreateTableOnShard(req.ctx, leader.Node, eventdispatch.CreateTableOnShardRequest{ + UpdateShardInfo: eventdispatch.UpdateShardInfo{ + CurrShardInfo: cluster.ShardInfo{ ID: createTableResult.ShardVersionUpdate.ShardID, // TODO: dispatch CreateTableOnShard to followers? - Role: clusterpb.ShardRole_LEADER, + Role: storage.Leader, Version: createTableResult.ShardVersionUpdate.CurrVersion, }, PrevVersion: createTableResult.ShardVersionUpdate.PrevVersion, }, - TableInfo: &cluster.TableInfo{ - ID: createTableResult.Table.GetID(), - Name: createTableResult.Table.GetName(), - SchemaID: createTableResult.Table.GetSchemaID(), - SchemaName: createTableResult.Table.GetSchemaName(), + TableInfo: cluster.TableInfo{ + ID: createTableResult.Table.ID, + Name: createTableResult.Table.Name, + SchemaID: createTableResult.Table.SchemaID, + SchemaName: req.sourceReq.GetSchemaName(), }, EncodedSchema: req.sourceReq.EncodedSchema, Engine: req.sourceReq.Engine, @@ -98,7 +102,7 @@ func createTablePrepareCallback(event *fsm.Event) { Options: req.sourceReq.Options, }) if err != nil { - cancelEventWithLog(event, err, "dispatch create table on shard") + cancelEventWithLog(event, err, "dispatch create table on shardNode") return } @@ -120,7 +124,7 @@ func createTableFailedCallback(event *fsm.Event) { log.Error("exec failed callback failed") } - table, exists, err := req.cluster.GetTable(req.ctx, req.sourceReq.GetSchemaName(), req.sourceReq.GetName()) + table, exists, err := req.cluster.GetTable(req.sourceReq.GetSchemaName(), req.sourceReq.GetName()) if err != nil { log.Error("create table failed, get table failed", zap.String("schemaName", req.sourceReq.GetSchemaName()), zap.String("tableName", req.sourceReq.GetName())) return @@ -130,9 +134,9 @@ func createTableFailedCallback(event *fsm.Event) { } // Rollback, drop table in ceresmeta. - _, err = req.cluster.DropTable(req.ctx, table.GetSchemaName(), table.GetName()) + _, err = req.cluster.DropTable(req.ctx, req.sourceReq.GetSchemaName(), table.Name) if err != nil { - log.Error("drop table failed, get table failed", zap.String("schemaName", table.GetSchemaName()), zap.String("tableName", table.GetName()), zap.Uint64("tableID", table.GetID())) + log.Error("drop table failed, get table failed", zap.String("schemaName", req.sourceReq.GetSchemaName()), zap.String("tableName", table.Name), zap.Uint64("tableID", uint64(table.ID))) return } } @@ -145,13 +149,13 @@ type createTableCallbackRequest struct { sourceReq *metaservicepb.CreateTableRequest - onSucceeded func(*cluster.CreateTableResult) error + onSucceeded func(cluster.CreateTableResult) error onFailed func(error) error - createTableResult *cluster.CreateTableResult + createTableResult cluster.CreateTableResult } -func NewCreateTableProcedure(dispatch eventdispatch.Dispatch, cluster *cluster.Cluster, id uint64, req *metaservicepb.CreateTableRequest, onSucceeded func(*cluster.CreateTableResult) error, onFailed func(error) error) Procedure { +func NewCreateTableProcedure(dispatch eventdispatch.Dispatch, cluster *cluster.Cluster, id uint64, req *metaservicepb.CreateTableRequest, onSucceeded func(cluster.CreateTableResult) error, onFailed func(error) error) Procedure { fsm := fsm.NewFSM( stateCreateTableBegin, createTableEvents, @@ -168,7 +172,7 @@ type CreateTableProcedure struct { req *metaservicepb.CreateTableRequest - onSucceeded func(*cluster.CreateTableResult) error + onSucceeded func(cluster.CreateTableResult) error onFailed func(error) error lock sync.RWMutex diff --git a/server/coordinator/procedure/drop_table.go b/server/coordinator/procedure/drop_table.go index 0edb33a0..7ca6ed26 100644 --- a/server/coordinator/procedure/drop_table.go +++ b/server/coordinator/procedure/drop_table.go @@ -6,7 +6,8 @@ import ( "context" "sync" - "github.com/CeresDB/ceresdbproto/pkg/clusterpb" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/CeresDB/ceresdbproto/pkg/metaservicepb" "github.com/CeresDB/ceresmeta/pkg/log" "github.com/CeresDB/ceresmeta/server/cluster" @@ -42,7 +43,7 @@ var ( func dropTablePrepareCallback(event *fsm.Event) { request := event.Args[0].(*dropTableCallbackRequest) - table, exists, err := request.cluster.GetTable(request.ctx, request.rawReq.GetSchemaName(), request.rawReq.GetName()) + table, exists, err := request.cluster.GetTable(request.rawReq.GetSchemaName(), request.rawReq.GetName()) if err != nil { cancelEventWithLog(event, err, "cluster get table") return @@ -57,46 +58,63 @@ func dropTablePrepareCallback(event *fsm.Event) { return } - shard, err := request.cluster.GetShardByID(table.GetShardID()) + shardNodesResult, err := request.cluster.GetShardNodeByTableIDs([]storage.TableID{table.ID}) if err != nil { - cancelEventWithLog(event, err, "cluster get shard by id") + cancelEventWithLog(event, err, "cluster get shard by table id") return } + + shardNodes, ok := shardNodesResult.ShardNodes[table.ID] + if !ok { + cancelEventWithLog(event, ErrShardLeaderNotFound, "cluster get shard by table id") + return + } + // TODO: consider followers - leader := shard.GetLeader() - if leader == nil { + leader := storage.ShardNode{} + found := false + for _, shardNode := range shardNodes { + if shardNode.ShardRole == storage.Leader { + found = true + leader = shardNode + break + } + } + + if !found { cancelEventWithLog(event, ErrShardLeaderNotFound, "can't find leader") return } - err = request.dispatch.DropTableOnShard(request.ctx, leader.Node, &eventdispatch.DropTableOnShardRequest{ - UpdateShardInfo: &eventdispatch.UpdateShardInfo{ - CurrShardInfo: &cluster.ShardInfo{ + tableInfo := cluster.TableInfo{ + ID: table.ID, + Name: table.Name, + SchemaID: table.SchemaID, + SchemaName: request.rawReq.GetSchemaName(), + } + err = request.dispatch.DropTableOnShard(request.ctx, leader.Node, eventdispatch.DropTableOnShardRequest{ + UpdateShardInfo: eventdispatch.UpdateShardInfo{ + CurrShardInfo: cluster.ShardInfo{ ID: result.ShardVersionUpdate.ShardID, - Role: clusterpb.ShardRole_LEADER, + Role: storage.Leader, Version: result.ShardVersionUpdate.CurrVersion, }, PrevVersion: result.ShardVersionUpdate.PrevVersion, }, - TableInfo: &cluster.TableInfo{ - ID: table.GetID(), - Name: table.GetName(), - SchemaID: table.GetSchemaID(), - SchemaName: table.GetSchemaName(), - }, + TableInfo: tableInfo, }) if err != nil { cancelEventWithLog(event, err, "dispatch drop table on shard") return } - request.ret = table.GetInfo() + request.ret = tableInfo } func dropTableSuccessCallback(event *fsm.Event) { req := event.Args[0].(*dropTableCallbackRequest) - if err := req.onSucceeded(&req.ret); err != nil { + if err := req.onSucceeded(req.ret); err != nil { log.Error("exec success callback failed") } } @@ -117,13 +135,13 @@ type dropTableCallbackRequest struct { rawReq *metaservicepb.DropTableRequest - onSucceeded func(*cluster.TableInfo) error + onSucceeded func(cluster.TableInfo) error onFailed func(error) error ret cluster.TableInfo } -func NewDropTableProcedure(dispatch eventdispatch.Dispatch, cluster *cluster.Cluster, id uint64, req *metaservicepb.DropTableRequest, onSucceeded func(*cluster.TableInfo) error, onFailed func(error) error) Procedure { +func NewDropTableProcedure(dispatch eventdispatch.Dispatch, cluster *cluster.Cluster, id uint64, req *metaservicepb.DropTableRequest, onSucceeded func(cluster.TableInfo) error, onFailed func(error) error) Procedure { fsm := fsm.NewFSM( stateDropTableBegin, dropTableEvents, @@ -139,7 +157,7 @@ type DropTableProcedure struct { dispatch eventdispatch.Dispatch req *metaservicepb.DropTableRequest - onSucceeded func(*cluster.TableInfo) error + onSucceeded func(cluster.TableInfo) error onFailed func(error) error lock sync.RWMutex diff --git a/server/coordinator/procedure/factory.go b/server/coordinator/procedure/factory.go index 8c335b08..540317ff 100644 --- a/server/coordinator/procedure/factory.go +++ b/server/coordinator/procedure/factory.go @@ -5,6 +5,8 @@ package procedure import ( "context" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/CeresDB/ceresdbproto/pkg/metaservicepb" "github.com/CeresDB/ceresmeta/server/cluster" "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch" @@ -19,14 +21,14 @@ type Factory struct { type ScatterRequest struct { Cluster *cluster.Cluster - ShardIDs []uint32 + ShardIDs []storage.ShardID } type CreateTableRequest struct { Cluster *cluster.Cluster SourceReq *metaservicepb.CreateTableRequest - OnSucceeded func(*cluster.CreateTableResult) error + OnSucceeded func(cluster.CreateTableResult) error OnFailed func(error) error } @@ -34,7 +36,7 @@ type DropTableRequest struct { Cluster *cluster.Cluster SourceReq *metaservicepb.DropTableRequest - OnSucceeded func(*cluster.TableInfo) error + OnSucceeded func(cluster.TableInfo) error OnFailed func(error) error } diff --git a/server/coordinator/procedure/scatter.go b/server/coordinator/procedure/scatter.go index f36e0697..10f59229 100644 --- a/server/coordinator/procedure/scatter.go +++ b/server/coordinator/procedure/scatter.go @@ -7,7 +7,8 @@ import ( "sync" "time" - "github.com/CeresDB/ceresdbproto/pkg/clusterpb" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/CeresDB/ceresmeta/pkg/log" "github.com/CeresDB/ceresmeta/server/cluster" "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch" @@ -49,7 +50,7 @@ func scatterPrepareCallback(event *fsm.Event) { waitForNodesReady(c) - if c.GetClusterState() == clusterpb.ClusterTopology_STABLE { + if c.GetClusterState() == storage.Stable { return } @@ -57,31 +58,35 @@ func scatterPrepareCallback(event *fsm.Event) { shardTotal := c.GetTotalShardNum() minNodeCount := c.GetClusterMinNodeCount() - shards, err := allocNodeShards(shardTotal, minNodeCount, registeredNodes, request.shardIDs) + shardNodes, err := allocNodeShards(shardTotal, minNodeCount, registeredNodes, request.shardIDs) if err != nil { - cancelEventWithLog(event, err, "alloc node shards failed") + cancelEventWithLog(event, err, "alloc node shardNodes failed") return } - shardViews := make([]*clusterpb.ShardTopology, 0, len(shards)) - for _, shard := range shards { - shardViews = append(shardViews, &clusterpb.ShardTopology{ - ShardId: shard.GetId(), - TableIds: []uint64{}, - Version: 0, + shardViews := make([]cluster.CreateShardView, 0, shardTotal) + for _, shard := range shardNodes { + shardViews = append(shardViews, cluster.CreateShardView{ + ShardID: shard.ID, + Tables: []storage.TableID{}, }) } - if err := c.CreateShardViews(ctx, clusterpb.ClusterTopology_STABLE, shardViews, shards); err != nil { - cancelEventWithLog(event, err, "create shard topologies failed") + if err := c.UpdateClusterView(ctx, storage.Stable, shardNodes); err != nil { + cancelEventWithLog(event, err, "update cluster view") + return + } + + if err := c.CreateShardViews(ctx, shardViews); err != nil { + cancelEventWithLog(event, err, "create shard views") return } - for _, shard := range shards { - openShardRequest := &eventdispatch.OpenShardRequest{ - Shard: &cluster.ShardInfo{ - ID: shard.GetId(), - Role: clusterpb.ShardRole_LEADER, + for _, shard := range shardNodes { + openShardRequest := eventdispatch.OpenShardRequest{ + Shard: cluster.ShardInfo{ + ID: shard.ID, + Role: storage.Leader, Version: 0, }, } @@ -110,7 +115,7 @@ func waitForNodesReady(c *cluster.Cluster) { } // Compute the total number of the available nodes. -func computeOnlineNodeNum(nodes []*cluster.RegisteredNode) uint32 { +func computeOnlineNodeNum(nodes []cluster.RegisteredNode) uint32 { onlineNodeNum := uint32(0) for _, node := range nodes { if node.IsOnline() { @@ -121,8 +126,8 @@ func computeOnlineNodeNum(nodes []*cluster.RegisteredNode) uint32 { } // Allocates shard ids across the registered nodes, and caller should ensure `minNodeCount <= len(allNodes)`. -func allocNodeShards(shardTotal uint32, minNodeCount uint32, allNodes []*cluster.RegisteredNode, shardIDs []uint32) ([]*clusterpb.Shard, error) { - shards := make([]*clusterpb.Shard, 0, shardTotal) +func allocNodeShards(shardTotal uint32, minNodeCount uint32, allNodes []cluster.RegisteredNode, shardIDs []storage.ShardID) ([]storage.ShardNode, error) { + shards := make([]storage.ShardNode, 0, shardTotal) perNodeShardCount := shardTotal / minNodeCount if shardTotal%minNodeCount != 0 { @@ -134,9 +139,9 @@ func allocNodeShards(shardTotal uint32, minNodeCount uint32, allNodes []*cluster if uint32(len(shards)) < shardTotal { shardID := shardIDs[len(shards)] // TODO: consider nodesCache state - shards = append(shards, &clusterpb.Shard{ - Id: shardID, - ShardRole: clusterpb.ShardRole_LEADER, + shards = append(shards, storage.ShardNode{ + ID: shardID, + ShardRole: storage.Leader, Node: allNodes[i].GetMeta().GetName(), }) } @@ -159,10 +164,10 @@ type ScatterCallbackRequest struct { cluster *cluster.Cluster ctx context.Context dispatch eventdispatch.Dispatch - shardIDs []uint32 + shardIDs []storage.ShardID } -func NewScatterProcedure(dispatch eventdispatch.Dispatch, cluster *cluster.Cluster, id uint64, shardIDs []uint32) Procedure { +func NewScatterProcedure(dispatch eventdispatch.Dispatch, cluster *cluster.Cluster, id uint64, shardIDs []storage.ShardID) Procedure { scatterProcedureFsm := fsm.NewFSM( stateScatterBegin, scatterEvents, @@ -177,7 +182,7 @@ type ScatterProcedure struct { fsm *fsm.FSM dispatch eventdispatch.Dispatch cluster *cluster.Cluster - shardIDs []uint32 + shardIDs []storage.ShardID lock sync.RWMutex state State diff --git a/server/coordinator/procedure/scatter_test.go b/server/coordinator/procedure/scatter_test.go index a276f4fc..609b2271 100644 --- a/server/coordinator/procedure/scatter_test.go +++ b/server/coordinator/procedure/scatter_test.go @@ -8,8 +8,9 @@ import ( "testing" "time" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/CeresDB/ceresdbproto/pkg/clusterpb" - "github.com/CeresDB/ceresdbproto/pkg/metaservicepb" "github.com/CeresDB/ceresmeta/server/cluster" "github.com/stretchr/testify/require" ) @@ -18,52 +19,47 @@ func newClusterAndRegisterNode(t *testing.T) *cluster.Cluster { re := require.New(t) ctx := context.Background() dispatch := MockDispatch{} - cluster := newTestCluster(ctx, t) - - nodeInfo1 := &metaservicepb.NodeInfo{ - Endpoint: nodeName0, - ShardInfos: nil, - } + c := newTestCluster(ctx, t) - totalShardNum := cluster.GetTotalShardNum() - shardIDs := make([]uint32, 0, totalShardNum) + totalShardNum := c.GetTotalShardNum() + shardIDs := make([]storage.ShardID, 0, totalShardNum) for i := uint32(0); i < totalShardNum; i++ { - shardIDs = append(shardIDs, i) + shardIDs = append(shardIDs, storage.ShardID(i)) } - p := NewScatterProcedure(dispatch, cluster, 1, shardIDs) + p := NewScatterProcedure(dispatch, c, 1, shardIDs) go func() { err := p.Start(ctx) re.NoError(err) }() // Cluster is empty, it should be return and do nothing - err := cluster.RegisterNode(ctx, nodeInfo1) + err := c.RegisterNode(ctx, storage.Node{ + Name: nodeName0, + }, []cluster.ShardInfo{}) re.NoError(err) - re.Equal(clusterpb.ClusterTopology_EMPTY, cluster.GetClusterState()) + re.Equal(storage.Empty, c.GetClusterState()) // Register two node, defaultNodeCount is satisfied, Initialize shard topology - nodeInfo2 := &metaservicepb.NodeInfo{ - Endpoint: nodeName1, - ShardInfos: nil, - } - err = cluster.RegisterNode(ctx, nodeInfo2) + err = c.RegisterNode(ctx, storage.Node{ + Name: nodeName1, + }, []cluster.ShardInfo{}) re.NoError(err) - return cluster + return c } func checkScatterWithCluster(t *testing.T, cluster *cluster.Cluster) { re := require.New(t) - re.Equal(clusterpb.ClusterTopology_STABLE, cluster.GetClusterState()) - shardViews, err := cluster.GetClusterShardView() + re.Equal(storage.Stable, cluster.GetClusterState()) + shardNodes, err := cluster.GetShardNodes() re.NoError(err) - re.Equal(len(shardViews), defaultShardTotal) - shardNodeMapping := make(map[string][]uint32, 0) - for _, shardView := range shardViews { - nodeName := shardView.GetNode() - shardID := shardView.GetId() + re.Equal(len(shardNodes), defaultShardTotal) + shardNodeMapping := make(map[string][]storage.ShardID, 0) + for _, shardNode := range shardNodes { + nodeName := shardNode.Node + shardID := shardNode.ID _, exists := shardNodeMapping[nodeName] if !exists { - shardNodeMapping[nodeName] = make([]uint32, 0) + shardNodeMapping[nodeName] = make([]storage.ShardID, 0) } shardNodeMapping[nodeName] = append(shardNodeMapping[nodeName], shardID) } @@ -84,17 +80,17 @@ func TestAllocNodeShard(t *testing.T) { minNodeCount := 4 shardTotal := 2 - nodeList := make([]*cluster.RegisteredNode, 0, minNodeCount) + nodeList := make([]cluster.RegisteredNode, 0, minNodeCount) for i := 0; i < minNodeCount; i++ { nodeMeta := &clusterpb.Node{ Name: fmt.Sprintf("node%d", i), } - node := cluster.NewRegisteredNode(nodeMeta, []*cluster.ShardInfo{}) + node := cluster.NewRegisteredNode(nodeMeta, []cluster.ShardInfo{}) nodeList = append(nodeList, node) } - shardIDs := make([]uint32, 0, shardTotal) + shardIDs := make([]storage.ShardID, 0, shardTotal) for i := uint32(0); i < uint32(shardTotal); i++ { - shardIDs = append(shardIDs, i) + shardIDs = append(shardIDs, storage.ShardID(i)) } // NodeCount = 4, shardTotal = 2 // Two shard distributed in node0,node1 @@ -106,19 +102,19 @@ func TestAllocNodeShard(t *testing.T) { minNodeCount = 2 shardTotal = 3 - nodeList = make([]*cluster.RegisteredNode, 0, minNodeCount) + nodeList = make([]cluster.RegisteredNode, 0, minNodeCount) for i := 0; i < minNodeCount; i++ { nodeMeta := &clusterpb.Node{ Name: fmt.Sprintf("node%d", i), } - node := cluster.NewRegisteredNode(nodeMeta, []*cluster.ShardInfo{}) + node := cluster.NewRegisteredNode(nodeMeta, []cluster.ShardInfo{}) nodeList = append(nodeList, node) } // NodeCount = 2, shardTotal = 3 // Three shard distributed in node0,node0,node1 - shardIDs = make([]uint32, 0, shardTotal) + shardIDs = make([]storage.ShardID, 0, shardTotal) for i := uint32(0); i < uint32(shardTotal); i++ { - shardIDs = append(shardIDs, i) + shardIDs = append(shardIDs, storage.ShardID(i)) } shardView, err = allocNodeShards(uint32(shardTotal), uint32(minNodeCount), nodeList, shardIDs) re.NoError(err) diff --git a/server/coordinator/scheduler.go b/server/coordinator/scheduler.go index d2022288..8d86904d 100644 --- a/server/coordinator/scheduler.go +++ b/server/coordinator/scheduler.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/CeresDB/ceresdbproto/pkg/metaservicepb" "github.com/CeresDB/ceresmeta/pkg/log" "github.com/CeresDB/ceresmeta/server/cluster" @@ -83,20 +85,24 @@ func (s *Scheduler) checkNode(ctx context.Context, ticker *time.Ticker) { log.Error("get node shards failed", zap.Error(err)) continue } - nodeShardsMapping := map[string][]*cluster.ShardInfo{} + nodeShardsMapping := map[string][]cluster.ShardInfo{} for _, nodeShard := range nodeShards.NodeShards { - _, exists := nodeShardsMapping[nodeShard.Endpoint] + _, exists := nodeShardsMapping[nodeShard.ShardNode.Node] if !exists { - nodeShardsMapping[nodeShard.Endpoint] = []*cluster.ShardInfo{} + nodeShardsMapping[nodeShard.ShardNode.Node] = []cluster.ShardInfo{} } - nodeShardsMapping[nodeShard.Endpoint] = append(nodeShardsMapping[nodeShard.Endpoint], nodeShard.ShardInfo) + nodeShardsMapping[nodeShard.ShardNode.Node] = append(nodeShardsMapping[nodeShard.ShardNode.Node], cluster.ShardInfo{ + ID: nodeShard.ShardNode.ID, + Role: nodeShard.ShardNode.ShardRole, + Version: nodeShards.ClusterTopologyVersion, + }) } s.processNodes(ctx, nodes, t, nodeShardsMapping) } } } -func (s *Scheduler) processNodes(ctx context.Context, nodes []*cluster.RegisteredNode, t time.Time, nodeShardsMapping map[string][]*cluster.ShardInfo) { +func (s *Scheduler) processNodes(ctx context.Context, nodes []cluster.RegisteredNode, t time.Time, nodeShardsMapping map[string][]cluster.ShardInfo) { for _, node := range nodes { // Determines whether node is expired. if !node.IsExpired(uint64(t.Unix()), heartbeatKeepAliveIntervalSec) { @@ -113,9 +119,9 @@ func (s *Scheduler) processNodes(ctx context.Context, nodes []*cluster.Registere // applyMetadataShardInfo verify shardInfo in heartbeats and metadata, they are forcibly synchronized to the latest version if they are inconsistent. // TODO: Encapsulate the following logic as a standalone ApplyProcedure. -func (s *Scheduler) applyMetadataShardInfo(ctx context.Context, node string, realShards []*cluster.ShardInfo, expectShards []*cluster.ShardInfo) error { - realShardInfoMapping := make(map[uint32]*cluster.ShardInfo, len(realShards)) - expectShardInfoMapping := make(map[uint32]*cluster.ShardInfo, len(expectShards)) +func (s *Scheduler) applyMetadataShardInfo(ctx context.Context, node string, realShards []cluster.ShardInfo, expectShards []cluster.ShardInfo) error { + realShardInfoMapping := make(map[storage.ShardID]cluster.ShardInfo, len(realShards)) + expectShardInfoMapping := make(map[storage.ShardID]cluster.ShardInfo, len(expectShards)) for _, realShard := range realShards { realShardInfoMapping[realShard.ID] = realShard } @@ -129,7 +135,7 @@ func (s *Scheduler) applyMetadataShardInfo(ctx context.Context, node string, rea // 1. Shard exists in metadata and not exists in node, reopen lack shards on node. if !exists { - if err := s.dispatch.OpenShard(ctx, node, &eventdispatch.OpenShardRequest{Shard: expectShard}); err != nil { + if err := s.dispatch.OpenShard(ctx, node, eventdispatch.OpenShardRequest{Shard: expectShard}); err != nil { return errors.WithMessagef(err, "reopen shard failed, shardInfo:%d", expectShard.ID) } continue @@ -141,10 +147,10 @@ func (s *Scheduler) applyMetadataShardInfo(ctx context.Context, node string, rea } // 3. Shard exists in both metadata and node, versions are inconsistent, close and reopen invalid shard on node. - if err := s.dispatch.CloseShard(ctx, node, &eventdispatch.CloseShardRequest{ShardID: expectShard.ID}); err != nil { + if err := s.dispatch.CloseShard(ctx, node, eventdispatch.CloseShardRequest{ShardID: uint32(expectShard.ID)}); err != nil { return errors.WithMessagef(err, "close shard failed, shardInfo:%d", expectShard.ID) } - if err := s.dispatch.OpenShard(ctx, node, &eventdispatch.OpenShardRequest{Shard: expectShard}); err != nil { + if err := s.dispatch.OpenShard(ctx, node, eventdispatch.OpenShardRequest{Shard: expectShard}); err != nil { return errors.WithMessagef(err, "reopen shard failed, shardInfo:%d", expectShard.ID) } } @@ -155,7 +161,7 @@ func (s *Scheduler) applyMetadataShardInfo(ctx context.Context, node string, rea if ok { continue } - if err := s.dispatch.CloseShard(ctx, node, &eventdispatch.CloseShardRequest{ShardID: realShard.ID}); err != nil { + if err := s.dispatch.CloseShard(ctx, node, eventdispatch.CloseShardRequest{ShardID: uint32(realShard.ID)}); err != nil { return errors.WithMessagef(err, "close shard failed, shardInfo:%d", realShard.ID) } } diff --git a/server/server.go b/server/server.go index 665d949d..e4f4298d 100644 --- a/server/server.go +++ b/server/server.go @@ -237,13 +237,13 @@ func (srv *Server) createDefaultCluster(ctx context.Context) error { log.Info("create default cluster succeed", zap.String("cluster", defaultCluster.Name())) } // Create and submit scatter procedure for default cluster. - shardIDs := make([]uint32, 0, defaultCluster.GetTotalShardNum()) + shardIDs := make([]storage.ShardID, 0, defaultCluster.GetTotalShardNum()) for i := uint32(0); i < defaultCluster.GetTotalShardNum(); i++ { shardID, err := defaultCluster.AllocShardID(ctx) if err != nil { return errors.WithMessage(err, "alloc shard id failed") } - shardIDs = append(shardIDs, shardID) + shardIDs = append(shardIDs, storage.ShardID(shardID)) } scatterRequest := &procedure.ScatterRequest{Cluster: defaultCluster, ShardIDs: shardIDs} scatterProcedure, err := srv.procedureFactory.CreateScatterProcedure(ctx, scatterRequest) @@ -285,10 +285,6 @@ func (srv *Server) GetProcedureManager() procedure.Manager { return srv.procedureManager } -func (srv *Server) ProcessHeartbeat(ctx context.Context, req *metaservicepb.NodeHeartbeatRequest) error { - return srv.clusterManager.RegisterNode(ctx, req.GetHeader().GetClusterName(), req.GetInfo()) -} - type leadershipEventCallbacks struct { srv *Server } diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go index 0535c837..b3e78913 100644 --- a/server/service/grpc/service.go +++ b/server/service/grpc/service.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/CeresDB/ceresdbproto/pkg/clusterpb" "github.com/CeresDB/ceresdbproto/pkg/commonpb" "github.com/CeresDB/ceresdbproto/pkg/metaservicepb" @@ -57,7 +59,20 @@ func (s *Service) NodeHeartbeat(ctx context.Context, req *metaservicepb.NodeHear return ceresmetaClient.NodeHeartbeat(ctx, req) } - err = s.h.GetClusterManager().RegisterNode(ctx, req.GetHeader().GetClusterName(), req.GetInfo()) + shardInfos := make([]cluster.ShardInfo, 0, len(req.Info.ShardInfos)) + for _, shardInfo := range req.Info.ShardInfos { + shardInfos = append(shardInfos, cluster.ConvertShardsInfoPB(shardInfo)) + } + + err = s.h.GetClusterManager().RegisterNode(ctx, req.GetHeader().GetClusterName(), storage.Node{ + Name: req.Info.Endpoint, + NodeStats: storage.NodeStats{ + Lease: req.GetInfo().Lease, + Zone: req.GetInfo().Zone, + NodeVersion: req.GetInfo().BinaryVersion, + }, + LastTouchTime: uint64(time.Now().UnixMilli()), + }, shardInfos) if err != nil { return &metaservicepb.NodeHeartbeatResponse{Header: responseHeader(err, "grpc heartbeat")}, nil } @@ -87,7 +102,7 @@ func (s *Service) AllocSchemaID(ctx context.Context, req *metaservicepb.AllocSch return &metaservicepb.AllocSchemaIdResponse{ Header: okResponseHeader(), Name: req.GetName(), - Id: schemaID, + Id: uint32(schemaID), }, nil } @@ -103,7 +118,12 @@ func (s *Service) GetTablesOfShards(ctx context.Context, req *metaservicepb.GetT return ceresmetaClient.GetTablesOfShards(ctx, req) } - tables, err := s.h.GetClusterManager().GetTables(ctx, req.GetHeader().GetClusterName(), req.GetHeader().GetNode(), req.GetShardIds()) + shardIDs := make([]storage.ShardID, 0, len(req.GetShardIds())) + for _, shardID := range req.GetShardIds() { + shardIDs = append(shardIDs, storage.ShardID(shardID)) + } + + tables, err := s.h.GetClusterManager().GetTables(req.GetHeader().GetClusterName(), req.GetHeader().GetNode(), shardIDs) if err != nil { return &metaservicepb.GetTablesOfShardsResponse{Header: responseHeader(err, "grpc get tables of shards")}, nil } @@ -133,9 +153,9 @@ func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTabl } errorCh := make(chan error, 1) - resultCh := make(chan *cluster.CreateTableResult, 1) + resultCh := make(chan cluster.CreateTableResult, 1) - onSucceeded := func(ret *cluster.CreateTableResult) error { + onSucceeded := func(ret cluster.CreateTableResult) error { resultCh <- ret return nil } @@ -165,13 +185,13 @@ func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTabl return &metaservicepb.CreateTableResponse{ Header: okResponseHeader(), CreatedTable: &metaservicepb.TableInfo{ - Id: ret.Table.GetID(), - Name: ret.Table.GetName(), - SchemaId: ret.Table.GetSchemaID(), - SchemaName: ret.Table.GetSchemaName(), + Id: uint64(ret.Table.ID), + Name: ret.Table.Name, + SchemaId: uint32(ret.Table.SchemaID), + SchemaName: req.GetSchemaName(), }, ShardInfo: &metaservicepb.ShardInfo{ - Id: ret.ShardVersionUpdate.ShardID, + Id: uint32(ret.ShardVersionUpdate.ShardID), Role: clusterpb.ShardRole_LEADER, Version: ret.ShardVersionUpdate.CurrVersion, }, @@ -203,9 +223,9 @@ func (s *Service) DropTable(ctx context.Context, req *metaservicepb.DropTableReq } errorCh := make(chan error, 1) - resultCh := make(chan *cluster.TableInfo, 1) + resultCh := make(chan cluster.TableInfo, 1) - onSucceeded := func(ret *cluster.TableInfo) error { + onSucceeded := func(ret cluster.TableInfo) error { resultCh <- ret return nil } @@ -282,14 +302,14 @@ func (s *Service) GetNodes(ctx context.Context, req *metaservicepb.GetNodesReque return convertToGetNodesResponse(nodesResult), nil } -func convertToGetTablesOfShardsResponse(shardTables map[uint32]*cluster.ShardTables) *metaservicepb.GetTablesOfShardsResponse { +func convertToGetTablesOfShardsResponse(shardTables map[storage.ShardID]cluster.ShardTables) *metaservicepb.GetTablesOfShardsResponse { tablesByShard := make(map[uint32]*metaservicepb.TablesOfShard, len(shardTables)) for id, shardTable := range shardTables { tables := make([]*metaservicepb.TableInfo, 0, len(shardTable.Tables)) for _, table := range shardTable.Tables { tables = append(tables, cluster.ConvertTableInfoToPB(table)) } - tablesByShard[id] = &metaservicepb.TablesOfShard{ + tablesByShard[uint32(id)] = &metaservicepb.TablesOfShard{ ShardInfo: cluster.ConvertShardsInfoToPB(shardTable.Shard), Tables: tables, } @@ -300,16 +320,16 @@ func convertToGetTablesOfShardsResponse(shardTables map[uint32]*cluster.ShardTab } } -func convertRouteTableResult(routeTablesResult *cluster.RouteTablesResult) *metaservicepb.RouteTablesResponse { +func convertRouteTableResult(routeTablesResult cluster.RouteTablesResult) *metaservicepb.RouteTablesResponse { entries := make(map[string]*metaservicepb.RouteEntry, len(routeTablesResult.RouteEntries)) for tableName, entry := range routeTablesResult.RouteEntries { nodeShards := make([]*metaservicepb.NodeShard, 0, len(entry.NodeShards)) for _, nodeShard := range entry.NodeShards { nodeShards = append(nodeShards, &metaservicepb.NodeShard{ - Endpoint: nodeShard.Endpoint, + Endpoint: nodeShard.ShardNode.Node, ShardInfo: &metaservicepb.ShardInfo{ - Id: nodeShard.ShardInfo.ID, - Role: nodeShard.ShardInfo.Role, + Id: uint32(nodeShard.ShardNode.ID), + Role: clusterpb.ShardRole(nodeShard.ShardNode.ShardRole), }, }) } @@ -322,17 +342,20 @@ func convertRouteTableResult(routeTablesResult *cluster.RouteTablesResult) *meta return &metaservicepb.RouteTablesResponse{ Header: okResponseHeader(), - ClusterTopologyVersion: routeTablesResult.Version, + ClusterTopologyVersion: routeTablesResult.ClusterViewVersion, Entries: entries, } } -func convertToGetNodesResponse(nodesResult *cluster.GetNodeShardsResult) *metaservicepb.GetNodesResponse { +func convertToGetNodesResponse(nodesResult cluster.GetNodeShardsResult) *metaservicepb.GetNodesResponse { nodeShards := make([]*metaservicepb.NodeShard, 0, len(nodesResult.NodeShards)) - for _, nodeShard := range nodesResult.NodeShards { + for _, shardNodeWithVersion := range nodesResult.NodeShards { nodeShards = append(nodeShards, &metaservicepb.NodeShard{ - Endpoint: nodeShard.Endpoint, - ShardInfo: cluster.ConvertShardsInfoToPB(nodeShard.ShardInfo), + Endpoint: shardNodeWithVersion.ShardNode.Node, + ShardInfo: &metaservicepb.ShardInfo{ + Id: uint32(shardNodeWithVersion.ShardNode.ID), + Role: clusterpb.ShardRole(shardNodeWithVersion.ShardNode.ShardRole), + }, }) } return &metaservicepb.GetNodesResponse{ diff --git a/server/storage/meta.go b/server/storage/meta.go index 1eaa8c81..e082ac6b 100644 --- a/server/storage/meta.go +++ b/server/storage/meta.go @@ -20,7 +20,7 @@ type Storage interface { // GetClusterView get cluster view by cluster id. GetClusterView(ctx context.Context, req GetClusterViewRequest) (GetClusterViewResult, error) // UpdateClusterView update cluster view. - UpdateClusterView(ctx context.Context, req PutClusterViewRequest) error + UpdateClusterView(ctx context.Context, req UpdateClusterViewRequest) error // ListSchemas list all schemas in specified cluster. ListSchemas(ctx context.Context, req ListSchemasRequest) (ListSchemasResult, error) diff --git a/server/storage/storage_impl.go b/server/storage/storage_impl.go index bad54853..09291c64 100644 --- a/server/storage/storage_impl.go +++ b/server/storage/storage_impl.go @@ -143,7 +143,7 @@ func (s *metaStorageImpl) GetClusterView(ctx context.Context, req GetClusterView }, nil } -func (s *metaStorageImpl) UpdateClusterView(ctx context.Context, req PutClusterViewRequest) error { +func (s *metaStorageImpl) UpdateClusterView(ctx context.Context, req UpdateClusterViewRequest) error { clusterViewPB := ConvertClusterViewToPB(req.ClusterView) value, err := proto.Marshal(&clusterViewPB) @@ -446,7 +446,7 @@ func (s *metaStorageImpl) ListNodes(ctx context.Context, req ListNodesRequest) ( if err := proto.Unmarshal(value, nodePB); err != nil { return ErrDecode.WithCausef("decode node, key:%s, value:%v, clusterID:%d, err:%v", key, value, req.ClusterID, err) } - node := convertNodePB(nodePB) + node := ConvertNodePB(nodePB) nodes = append(nodes, node) return nil } diff --git a/server/storage/storage_test.go b/server/storage/storage_test.go index 7760acc9..9657dc63 100644 --- a/server/storage/storage_test.go +++ b/server/storage/storage_test.go @@ -98,7 +98,7 @@ func TestStorage_CreateAndGetClusterView(t *testing.T) { // Test to put cluster view. expectClusterView.Version = uint64(1) - putReq := PutClusterViewRequest{ + putReq := UpdateClusterViewRequest{ ClusterID: defaultClusterID, ClusterView: expectClusterView, LatestVersion: 0, diff --git a/server/storage/types.go b/server/storage/types.go index 7d2446d7..73aff12e 100644 --- a/server/storage/types.go +++ b/server/storage/types.go @@ -51,7 +51,7 @@ type GetClusterViewResult struct { ClusterView ClusterView } -type PutClusterViewRequest struct { +type UpdateClusterViewRequest struct { ClusterID ClusterID ClusterView ClusterView LatestVersion uint64 @@ -247,7 +247,7 @@ func convertShardRoleToPB(role ShardRole) clusterpb.ShardRole { return clusterpb.ShardRole_FOLLOWER } -func convertShardRolePB(role clusterpb.ShardRole) ShardRole { +func ConvertShardRolePB(role clusterpb.ShardRole) ShardRole { switch role { case clusterpb.ShardRole_LEADER: return Leader @@ -270,7 +270,7 @@ func convertShardNodeToPB(shardNode ShardNode) clusterpb.Shard { func ConvertShardNodePB(shardNode *clusterpb.Shard) ShardNode { return ShardNode{ ID: ShardID(shardNode.Id), - ShardRole: convertShardRolePB(shardNode.ShardRole), + ShardRole: ConvertShardRolePB(shardNode.ShardRole), Node: shardNode.Node, } } @@ -434,7 +434,7 @@ func ConvertNodeToPB(node Node) clusterpb.Node { } } -func convertNodePB(node *clusterpb.Node) Node { +func ConvertNodePB(node *clusterpb.Node) Node { nodeStats := convertNodeStatsPB(node.NodeStats) return Node{ Name: node.Name,