Skip to content

Commit

Permalink
refactor: reformat the query result and fix some problem (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer committed Dec 4, 2020
1 parent 3a85065 commit fadd412
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 150 deletions.
4 changes: 2 additions & 2 deletions cmd/list_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func init() {
Help: "list all tables in the cluster",
Flags: func(f *grumble.Flags) {
/*define the flags*/
f.Bool("j", "json", false, "Use JSON as the format of the output results. By default tabular format is used.")
f.BoolL("drop", false, "only show dropped table information")
},
Run: func(c *grumble.Context) error {
return executor.ListTables(pegasusClient, c.Flags.Bool("json"))
return executor.ListTables(pegasusClient, c.Flags.Bool("drop"))
},
})
}
4 changes: 0 additions & 4 deletions cmd/nodes_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,9 @@ func init() {
shell.AddCommand(&grumble.Command{
Name: "nodes-stat",
Help: "query all nodes perf stat in the cluster",
Flags: func(f *grumble.Flags) {
f.Bool("d", "detail", false, "show node detail perf stats in cluster")
},
Run: func(c *grumble.Context) error {
return executor.ShowNodesStat(
pegasusClient,
c.Flags.Bool("detail"),
)
},
})
Expand Down
105 changes: 59 additions & 46 deletions executor/list_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/XiaoMi/pegasus-go-client/idl/admin"
"github.com/XiaoMi/pegasus-go-client/idl/replication"
"github.com/olekukonko/tablewriter"
"github.com/pegasus-kv/admin-cli/tabular"
)
Expand All @@ -44,13 +45,45 @@ type nodeInfoStruct struct {
func ListNodes(client *Client) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
listNodeResp, err := client.Meta.ListNodes(ctx, &admin.ListNodesRequest{
Status: admin.NodeStatus_NS_INVALID,
})

nodes, err := getNodesMap(client, ctx)
if err != nil {
return err
}

listTableResp, errTable := client.Meta.ListApps(ctx, &admin.ListAppsRequest{
Status: admin.AppStatus_AS_AVAILABLE,
})
if errTable != nil {
return errTable
}
var tableNames []string
for _, info := range listTableResp.Infos {
tableNames = append(tableNames, info.AppName)
}

for _, tb := range tableNames {
queryCfgResp, err := client.Meta.QueryConfig(ctx, tb)
if err != nil {
return err
}
nodes, err = fillNodesInfo(nodes, queryCfgResp.Partitions)
if err != nil {
return err
}
}

printNodesInfo(client, nodes)
return nil
}

func getNodesMap(client *Client, ctx context.Context) (map[string]*nodeInfoStruct, error) {
listNodeResp, errNode := client.Meta.ListNodes(ctx, &admin.ListNodesRequest{
Status: admin.NodeStatus_NS_INVALID,
})
if errNode != nil {
return nil, errNode
}
nodes := make(map[string]*nodeInfoStruct)
for _, ninfo := range listNodeResp.Infos {
n := client.Nodes.MustGetReplica(ninfo.Address.GetAddress())
Expand All @@ -59,11 +92,31 @@ func ListNodes(client *Client) error {
Status: ninfo.Status.String(),
}
}
nodes, err = fillNodesInfoMap(client, nodes)
if err != nil {
return err
return nodes, nil
}

func fillNodesInfo(nodes map[string]*nodeInfoStruct, partitions []*replication.PartitionConfiguration) (map[string]*nodeInfoStruct, error) {
for _, part := range partitions {
n := nodes[part.Primary.GetAddress()]
if len(part.Primary.GetAddress()) != 0 {
if n != nil {
n.PrimaryCount++
n.ReplicaTotalCount++
} else {
return nil, fmt.Errorf("inconsistent state: nodes are updated")
}
}

for _, sec := range part.Secondaries {
n := nodes[sec.GetAddress()]
n.SecondaryCount++
n.ReplicaTotalCount++
}
}
return nodes, nil
}

func printNodesInfo(client *Client, nodes map[string]*nodeInfoStruct) {
// render in tabular form
var nodeList []interface{}
for _, n := range nodes {
Expand All @@ -74,7 +127,6 @@ func ListNodes(client *Client) error {
tabular.New(client, nodeList, func(t *tablewriter.Table) {
footerWithTotalCount(t, nodeList)
}).Render()
return nil
}

func nodesSortByAddress(nodes []interface{}) []interface{} {
Expand Down Expand Up @@ -108,42 +160,3 @@ func footerWithTotalCount(tbWriter *tablewriter.Table, nlist []interface{}) {
fmt.Sprintf("%d", totalSecCnt),
})
}

func fillNodesInfoMap(client *Client, nodes map[string]*nodeInfoStruct) (map[string]*nodeInfoStruct, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
listAppsResp, err := client.Meta.ListApps(ctx, &admin.ListAppsRequest{
Status: admin.AppStatus_AS_AVAILABLE,
})
if err != nil {
return nil, err
}
var tableNames []string
for _, info := range listAppsResp.Infos {
tableNames = append(tableNames, info.AppName)
}

for _, tb := range tableNames {
// reuse the previous context, failed if the total time expires
queryCfgResp, err := client.Meta.QueryConfig(ctx, tb)
if err != nil {
return nil, err
}
for _, part := range queryCfgResp.Partitions {
n := nodes[part.Primary.GetAddress()]
if n == nil {
return nil, fmt.Errorf("inconsistent state: nodes are updated")
}
n.PrimaryCount++
n.ReplicaTotalCount++

for _, sec := range part.Secondaries {
n := nodes[sec.GetAddress()]
n.SecondaryCount++
n.ReplicaTotalCount++
}
}
}

return nodes, nil
}
114 changes: 84 additions & 30 deletions executor/list_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,59 +21,113 @@ package executor

import (
"context"
"encoding/json"
"fmt"
"github.com/pegasus-kv/admin-cli/tabular"
"time"

"github.com/XiaoMi/pegasus-go-client/idl/admin"
"github.com/pegasus-kv/admin-cli/executor/util"
"github.com/pegasus-kv/admin-cli/tabular"
)

// ListTables command.
func ListTables(client *Client, useJSON bool) error {
func ListTables(client *Client, showDropped bool) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

var status admin.AppStatus
if showDropped {
status = admin.AppStatus_AS_DROPPED
} else {
status = admin.AppStatus_AS_AVAILABLE
}

resp, err := client.Meta.ListApps(ctx, &admin.ListAppsRequest{
Status: admin.AppStatus_AS_AVAILABLE,
Status: status,
})
if err != nil {
return err
}

type tableStruct struct {
AppID int32 `json:"app_id"`
type dropTableStruct struct {
AppID int32 `json:"id"`
Name string `json:"name"`
PartitionCount int32 `json:"partitionCount"`
DropTime string `json:"dropTime"`
ExpireTime string `json:"expireTime"`
}

type availableTableStruct struct {
AppID int32 `json:"id"`
Name string `json:"name"`
PartitionCount int32 `json:"partition_count"`
CreateTime string `json:"create_time"`
WReqRateLimit string `json:"wreq_rate-limit"`
WBytesRateLimit string `json:"wbytes_rate-limit"`
PartitionCount int32 `json:"partitionCount"`
FullHealthy int32 `json:"fullHealthy"`
UnHealthy int32 `json:"unhealthy"`
WriteUnHealthy int32 `json:"writeUnhealthy"`
ReadUnHealthy int32 `json:"readUnhealthy"`
CreateTime string `json:"createTime"`
WReqRateLimit string `json:"writeReqLimit"`
WBytesRateLimit string `json:"writeBytesLimit"`
}

var tbList []interface{}
for _, tb := range resp.Infos {
createTime := "unknown"
if tb.CreateSecond != 0 {
createTime = time.Unix(tb.CreateSecond, 0).Format("2006-01-02")
if status == admin.AppStatus_AS_AVAILABLE {
fullHealthy, unHealthy, writeUnHealthy, readUnHealthy, err := getPartitionHealthyCount(client, tb)
if err != nil {
return err
}
tbList = append(tbList, availableTableStruct{
AppID: tb.AppID,
Name: tb.AppName,
FullHealthy: fullHealthy,
UnHealthy: unHealthy,
WriteUnHealthy: writeUnHealthy,
ReadUnHealthy: readUnHealthy,
PartitionCount: tb.PartitionCount,
CreateTime: util.FormatDate(tb.CreateSecond),
WReqRateLimit: tb.Envs["replica.write_throttling"],
WBytesRateLimit: tb.Envs["replica.write_throttling_by_size"],
})
} else if status == admin.AppStatus_AS_DROPPED {
tbList = append(tbList, dropTableStruct{
AppID: tb.AppID,
Name: tb.AppName,
DropTime: util.FormatDate(tb.DropSecond),
ExpireTime: util.FormatDate(tb.ExpireSecond),
PartitionCount: tb.PartitionCount,
})
}
throttlingQPS := tb.Envs["replica.write_throttling"]
throttlingBytes := tb.Envs["replica.write_throttling_by_size"]
tbList = append(tbList, tableStruct{
AppID: tb.AppID,
Name: tb.AppName,
PartitionCount: tb.PartitionCount,
CreateTime: createTime,
WReqRateLimit: throttlingQPS,
WBytesRateLimit: throttlingBytes,
})
}

if useJSON {
// formats into JSON
outputBytes, _ := json.MarshalIndent(tbList, "", " ")
fmt.Fprintln(client, string(outputBytes))
return nil
}

// formats into tabular
tabular.Print(client, tbList)
return nil
}

// return (FullHealthy, UnHealthy, WriteUnHealthy, ReadUnHealthy, Err)
func getPartitionHealthyCount(client *Client, table *admin.AppInfo) (int32, int32, int32, int32, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
resp, err := client.Meta.QueryConfig(ctx, table.AppName)
if err != nil {
return 0, 0, 0, 0, err
}

var fullHealthy, unHealthy, writeUnHealthy, readUnHealthy int32
for _, partition := range resp.Partitions {
var replicaCnt int32
if partition.Primary == nil {
writeUnHealthy++
readUnHealthy++
} else {
replicaCnt = int32(len(partition.Secondaries) + 1)
if replicaCnt >= partition.MaxReplicaCount {
fullHealthy++
} else if replicaCnt < 2 {
writeUnHealthy++
}
}
}
unHealthy = table.PartitionCount - fullHealthy
return fullHealthy, unHealthy, writeUnHealthy, readUnHealthy, nil
}
6 changes: 3 additions & 3 deletions executor/nodes_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ Request:
counter: replica*app.pegasus*multi_put_bytes
`

func ShowNodesStat(client *Client, detail bool) error {
func ShowNodesStat(client *Client) error {
nodesStats := util.GetNodeStat(client.Perf)
printNodesStatsTabular(client, nodesStats, detail)
printNodesStatsTabular(client, nodesStats)
return nil
}

func printNodesStatsTabular(client *Client, nodes map[string]*aggregate.NodeStat, detail bool) {
func printNodesStatsTabular(client *Client, nodes map[string]*aggregate.NodeStat) {
t := tabular.NewTemplate(nodeStatsTemplate)
t.SetCommonColumns([]string{"Node"}, func(rowData interface{}) []string {
node := rowData.(*aggregate.NodeStat)
Expand Down
44 changes: 13 additions & 31 deletions executor/table_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package executor

import (
"context"
"fmt"
"strings"
"time"

Expand All @@ -37,6 +38,17 @@ func ShowTablePartitions(client *Client, tableName string) error {
return err
}

nodes, err := getNodesMap(client, ctx)
if err != nil {
return err
}
nodes, err = fillNodesInfo(nodes, resp.Partitions)
if err != nil {
return err
}
fmt.Println("[PartitionCount]")
printNodesInfo(client, nodes)

type partitionStruct struct {
Pidx int32 `json:"pidx"`
PrimaryAddr string `json:"primary"`
Expand All @@ -61,37 +73,7 @@ func ShowTablePartitions(client *Client, tableName string) error {
partitions = append(partitions, p)
}

fmt.Println("[PartitionDistribution]")
tabular.Print(client, partitions)

// TODO(wutao): this piece of code is repeated with list-nodes, which also calculates replica count distribution among nodes.
type nodeStruct struct {
Address string `json:"address"`
PrimaryCount int `json:"primary"`
SecondaryCount int `json:"secondary"`
ReplicaCount int `json:"replica"`
}
nodesMap := make(map[string]*nodeStruct)
for _, partition := range resp.Partitions {
nodesMap[partition.Primary.GetAddress()] = &nodeStruct{Address: partition.Primary.GetAddress()}
for _, sec := range partition.Secondaries {
nodesMap[sec.GetAddress()] = &nodeStruct{Address: sec.GetAddress()}
}
}
for _, partition := range resp.Partitions {
nodesMap[partition.Primary.GetAddress()].PrimaryCount++
nodesMap[partition.Primary.GetAddress()].ReplicaCount++
for _, sec := range partition.Secondaries {
nodesMap[sec.GetAddress()].SecondaryCount++
nodesMap[sec.GetAddress()].ReplicaCount++
}
}
var nodeList []interface{}
for _, n := range nodesMap {
n.Address = client.Nodes.MustGetReplica(n.Address).CombinedAddr()
nodeList = append(nodeList, *n)
}
nodeList = nodesSortByAddress(nodeList)
tabular.Print(client, nodeList)

return nil
}
Loading

0 comments on commit fadd412

Please sign in to comment.