Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

chore: add route http interface & fix some bugs #104

Merged
merged 7 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/cluster/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ var (
ErrShardNotFound = coderr.NewCodeError(coderr.NotFound, "shard not found")
ErrNodeNotFound = coderr.NewCodeError(coderr.NotFound, "NodeName not found")
ErrTableAlreadyExists = coderr.NewCodeError(coderr.Internal, "table already exists")
ErrRouteTable = coderr.NewCodeError(coderr.Internal, "route table")
)
6 changes: 4 additions & 2 deletions server/cluster/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,10 @@ func (m *TableManagerImpl) loadTable(ctx context.Context) error {
for _, table := range tablesResult.Tables {
tables, ok := m.schemaTables[table.SchemaID]
if !ok {
tables.tables = make(map[string]storage.Table, 0)
tables.tablesByID = make(map[storage.TableID]storage.Table, 0)
tables = &Tables{
tables: make(map[string]storage.Table, 0),
tablesByID: make(map[storage.TableID]storage.Table, 0),
}
}

tables.tables[table.Name] = table
Expand Down
8 changes: 4 additions & 4 deletions server/cluster/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package cluster

import (
"github.com/CeresDB/ceresdbproto/golang/pkg/clusterpb"
"github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/server/storage"
)
Expand All @@ -25,8 +24,9 @@ type ShardTables struct {
}

type ShardInfo struct {
ID storage.ShardID
Role storage.ShardRole
ID storage.ShardID
Role storage.ShardRole
// ShardViewVersion
Version uint64
}

Expand Down Expand Up @@ -88,7 +88,7 @@ func (n RegisteredNode) IsExpired(now uint64, aliveThreshold uint64) bool {
func ConvertShardsInfoToPB(shard ShardInfo) *metaservicepb.ShardInfo {
return &metaservicepb.ShardInfo{
Id: uint32(shard.ID),
Role: clusterpb.ShardRole(shard.Role),
Role: storage.ConvertShardRoleToPB(shard.Role),
Version: shard.Version,
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *Scheduler) checkNode(ctx context.Context, ticker *time.Ticker) {
nodeShardsMapping[nodeShard.ShardNode.NodeName] = append(nodeShardsMapping[nodeShard.ShardNode.NodeName], cluster.ShardInfo{
ID: nodeShard.ShardNode.ID,
Role: nodeShard.ShardNode.ShardRole,
Version: nodeShards.ClusterTopologyVersion,
Version: nodeShard.ShardInfo.Version,
})
}
s.processNodes(ctx, nodes, t, nodeShardsMapping)
Expand Down
6 changes: 4 additions & 2 deletions server/service/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (s *Service) NodeHeartbeat(ctx context.Context, req *metaservicepb.NodeHear
}, ShardInfos: shardInfos,
}

log.Info("registerNode", zap.String("name", req.Info.Endpoint), zap.String("info", req.Info.String()))

err = s.h.GetClusterManager().RegisterNode(ctx, req.GetHeader().GetClusterName(), registeredNode)
if err != nil {
return &metaservicepb.NodeHeartbeatResponse{Header: responseHeader(err, "grpc heartbeat")}, nil
Expand Down Expand Up @@ -332,7 +334,7 @@ func convertRouteTableResult(routeTablesResult cluster.RouteTablesResult) *metas
Endpoint: nodeShard.ShardNode.NodeName,
ShardInfo: &metaservicepb.ShardInfo{
Id: uint32(nodeShard.ShardNode.ID),
Role: clusterpb.ShardRole(nodeShard.ShardNode.ShardRole),
Role: storage.ConvertShardRoleToPB(nodeShard.ShardNode.ShardRole),
},
})
}
Expand All @@ -357,7 +359,7 @@ func convertToGetNodesResponse(nodesResult cluster.GetNodeShardsResult) *metaser
Endpoint: shardNodeWithVersion.ShardNode.NodeName,
ShardInfo: &metaservicepb.ShardInfo{
Id: uint32(shardNodeWithVersion.ShardNode.ID),
Role: clusterpb.ShardRole(shardNodeWithVersion.ShardNode.ShardRole),
Role: storage.ConvertShardRoleToPB(shardNodeWithVersion.ShardNode.ShardRole),
},
})
}
Expand Down
40 changes: 37 additions & 3 deletions server/service/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package http

import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
Expand Down Expand Up @@ -43,6 +44,7 @@ func (a *API) NewAPIRouter() *Router {
router := New().WithPrefix(apiPrefix).WithInstrumentation(printRequestInsmt)

router.Post("/transferLeader", a.transferLeader)
router.Post("/route", a.route)

return router
}
Expand Down Expand Up @@ -120,7 +122,7 @@ func (a *API) transferLeader(writer http.ResponseWriter, req *http.Request) {
err := json.NewDecoder(req.Body).Decode(&transferLeaderRequest)
if err != nil {
log.Error("decode request body failed", zap.Error(err))
a.respondError(writer, ErrParseRequest, nil)
a.respondError(writer, ErrParseRequest, "decode request body failed")
return
}

Expand All @@ -132,15 +134,47 @@ func (a *API) transferLeader(writer http.ResponseWriter, req *http.Request) {
})
if err != nil {
log.Error("create transfer leader procedure", zap.Error(err))
a.respondError(writer, procedure.ErrCreateProcedure, nil)
a.respondError(writer, procedure.ErrCreateProcedure, "create transfer leader procedure")
return
}
err = a.procedureManager.Submit(req.Context(), transferLeaderProcedure)
if err != nil {
log.Error("submit transfer leader procedure", zap.Error(err))
a.respondError(writer, procedure.ErrSubmitProcedure, nil)
a.respondError(writer, procedure.ErrSubmitProcedure, "submit transfer leader procedure")
return
}

a.respond(writer, "ok")
}

type RouteRequest struct {
ClusterName string `json:"clusterName"`
SchemaName string `json:"schemaName"`
Tables []string `json:"table"`
}

func (a *API) route(writer http.ResponseWriter, req *http.Request) {
var routeRequest RouteRequest
err := json.NewDecoder(req.Body).Decode(&routeRequest)
if err != nil {
log.Error("decode request body failed", zap.Error(err))
ZuLiangWang marked this conversation as resolved.
Show resolved Hide resolved
a.respondError(writer, ErrParseRequest, "decode request body failed")
return
}

result, err := a.clusterManager.RouteTables(context.Background(), routeRequest.ClusterName, routeRequest.SchemaName, routeRequest.Tables)
if err != nil {
log.Error("route tables failed", zap.Error(err))
a.respondError(writer, cluster.ErrRouteTable, "route tables failed")
return
}

resultByte, err := json.Marshal(result)
if err != nil {
log.Error("route tables result marshal failed", zap.Error(err))
a.respondError(writer, ErrParseResponse, "route tables result marshal failed")
return
}

a.respond(writer, string(resultByte))
}
4 changes: 2 additions & 2 deletions server/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func convertClusterStatePB(state clusterpb.ClusterTopology_ClusterState) Cluster
return ClusterStateEmpty
}

func convertShardRoleToPB(role ShardRole) clusterpb.ShardRole {
func ConvertShardRoleToPB(role ShardRole) clusterpb.ShardRole {
switch role {
case ShardRoleLeader:
return clusterpb.ShardRole_LEADER
Expand All @@ -267,7 +267,7 @@ func convertShardRoleToPB(role ShardRole) clusterpb.ShardRole {
func convertShardNodeToPB(shardNode ShardNode) clusterpb.Shard {
return clusterpb.Shard{
Id: uint32(shardNode.ID),
ShardRole: convertShardRoleToPB(shardNode.ShardRole),
ShardRole: ConvertShardRoleToPB(shardNode.ShardRole),
Node: shardNode.NodeName,
}
}
Expand Down