Skip to content

Commit

Permalink
antctl: add multicluster subcommand (#3341)
Browse files Browse the repository at this point in the history
Add a new subcommand for antrea multicluster and add get verb for query
resources. We could list or get mc resource by antctl mc get <Resource>.

Example:

NAMESPACE                NAME                                                         KIND
antrea-mcs-ns            antrea-multicluster-test-east-nginx-endpoints                ResourceImport
antrea-mcs-ns            antrea-multicluster-test-east-nginx-service                  ResourceImport
antrea-mcs-ns            antrea-multicluster-test-west-nginx-endpoints                ResourceImport
antrea-mcs-ns            antrea-multicluster-test-west-nginx-service                  ResourceImport

In the future, other verb like create or delete also could be added to
the sub-command mc if it is nesscessary.

Signed-off-by: hjiajing <hjiajing@vmware.com>
  • Loading branch information
hjiajing committed Mar 21, 2022
1 parent 3c2bfe0 commit 9879f4c
Show file tree
Hide file tree
Showing 10 changed files with 639 additions and 260 deletions.
7 changes: 7 additions & 0 deletions pkg/antctl/antctl.go
Expand Up @@ -24,6 +24,7 @@ import (
"antrea.io/antrea/pkg/agent/openflow"
fallbackversion "antrea.io/antrea/pkg/antctl/fallback/version"
"antrea.io/antrea/pkg/antctl/raw/featuregates"
"antrea.io/antrea/pkg/antctl/raw/multicluster"
"antrea.io/antrea/pkg/antctl/raw/proxy"
"antrea.io/antrea/pkg/antctl/raw/supportbundle"
"antrea.io/antrea/pkg/antctl/raw/traceflow"
Expand Down Expand Up @@ -531,6 +532,12 @@ var CommandList = &commandList{
supportController: true,
commandGroup: get,
},
{
cobraCommand: multicluster.GetCmd,
supportAgent: false,
supportController: false,
commandGroup: mc,
},
},
codec: scheme.Codecs,
}
Expand Down
271 changes: 14 additions & 257 deletions pkg/antctl/command_definition.go
Expand Up @@ -23,16 +23,14 @@ import (
"sort"
"strconv"
"strings"
"text/tabwriter"

"github.com/spf13/cobra"
"gopkg.in/yaml.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/antctl/output"
"antrea.io/antrea/pkg/antctl/runtime"
"antrea.io/antrea/pkg/antctl/transform/common"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/controller/networkpolicy"
)
Expand All @@ -46,8 +44,7 @@ const (
)

const (
maxTableOutputColumnLength int = 50
sortByEffectivePriority string = "effectivePriority"
sortByEffectivePriority string = "effectivePriority"
)

// commandGroup is used to group commands, it could be specified in commandDefinition.
Expand Down Expand Up @@ -76,6 +73,7 @@ const (
flat commandGroup = iota
get
query
mc
)

var groupCommands = map[commandGroup]*cobra.Command{
Expand All @@ -89,6 +87,11 @@ var groupCommands = map[commandGroup]*cobra.Command{
Short: "Execute a user-provided query",
Long: "Execute a user-provided query",
},
mc: {
Use: "mc",
Short: "Sub-commands of multi-cluster feature",
Long: "Sub-commands of multi-cluster feature",
},
}

type endpointResponder interface {
Expand Down Expand Up @@ -380,176 +383,6 @@ func (cd *commandDefinition) decode(r io.Reader, single bool) (interface{}, erro
return reflect.Indirect(ref).Interface(), nil
}

func jsonEncode(obj interface{}, output *bytes.Buffer) error {
if err := json.NewEncoder(output).Encode(obj); err != nil {
return fmt.Errorf("error when encoding data in json: %w", err)
}
return nil
}

func (cd *commandDefinition) jsonOutput(obj interface{}, writer io.Writer) error {
var output bytes.Buffer
if err := jsonEncode(obj, &output); err != nil {
return fmt.Errorf("error when encoding data in json: %w", err)
}

var prettifiedBuf bytes.Buffer
err := json.Indent(&prettifiedBuf, output.Bytes(), "", " ")
if err != nil {
return fmt.Errorf("error when formatting outputing in json: %w", err)
}
_, err = io.Copy(writer, &prettifiedBuf)
if err != nil {
return fmt.Errorf("error when outputing in json format: %w", err)
}
return nil
}

func (cd *commandDefinition) yamlOutput(obj interface{}, writer io.Writer) error {
var jsonObj interface{}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(obj); err != nil {
return fmt.Errorf("error when outputing in yaml format: %w", err)
}
// Comment copied from: sigs.k8s.io/yaml
// We are using yaml.Unmarshal here (instead of json.Unmarshal) because the
// Go JSON library doesn't try to pick the right number type (int, float,
// etc.) when unmarshalling to interface{}, it just picks float64
// universally. go-yaml does go through the effort of picking the right
// number type, so we can preserve number type throughout this process.
if err := yaml.Unmarshal(buf.Bytes(), &jsonObj); err != nil {
return fmt.Errorf("error when outputing in yaml format: %w", err)
}
if err := yaml.NewEncoder(writer).Encode(jsonObj); err != nil {
return fmt.Errorf("error when outputing in yaml format: %w", err)
}
return nil
}

// respTransformer collects output fields in original transformedResponse
// and flattens them. respTransformer realizes this by turning obj into
// JSON and unmarshalling it.
// E.g. agent's transformedVersionResponse will only have two fields after
// transforming: agentVersion and antctlVersion.
func respTransformer(obj interface{}) (interface{}, error) {
var jsonObj bytes.Buffer
if err := json.NewEncoder(&jsonObj).Encode(obj); err != nil {
return nil, fmt.Errorf("error when encoding data in json: %w", err)
}
jsonStr := jsonObj.String()

var target interface{}
if err := json.Unmarshal([]byte(jsonStr), &target); err != nil {
return nil, fmt.Errorf("error when unmarshalling data in json: %w", err)
}
return target, nil
}

// tableOutputForGetCommands formats the table output for "get" commands.
func (cd *commandDefinition) tableOutputForGetCommands(obj interface{}, writer io.Writer) error {
var list []common.TableOutput
if reflect.TypeOf(obj).Kind() == reflect.Slice {
s := reflect.ValueOf(obj)
if s.Len() == 0 || s.Index(0).Interface() == nil {
var buffer bytes.Buffer
buffer.WriteString("\n")
if _, err := io.Copy(writer, &buffer); err != nil {
return fmt.Errorf("error when copy output into writer: %w", err)
}
return nil
}
if _, ok := s.Index(0).Interface().(common.TableOutput); !ok {
return cd.tableOutput(obj, writer)
}
for i := 0; i < s.Len(); i++ {
ele := s.Index(i)
list = append(list, ele.Interface().(common.TableOutput))
}
} else {
ele, ok := obj.(common.TableOutput)
if !ok {
return cd.tableOutput(obj, writer)
}
list = []common.TableOutput{ele}
}

// Get the elements and headers of table.
args := list[0].GetTableHeader()
rows := make([][]string, len(list)+1)
rows[0] = list[0].GetTableHeader()
for i, element := range list {
rows[i+1] = element.GetTableRow(maxTableOutputColumnLength)
}

if list[0].SortRows() {
// Sort the table rows according to columns in order.
body := rows[1:]
sort.Slice(body, func(i, j int) bool {
for k := range body[i] {
if body[i][k] != body[j][k] {
return body[i][k] < body[j][k]
}
}
return true
})
}
// Construct the table.
numRows, numCols := len(list)+1, len(args)
widths := getColumnWidths(numRows, numCols, rows)
return constructTable(numRows, numCols, widths, rows, writer)
}

func getColumnWidths(numRows int, numCols int, rows [][]string) []int {
widths := make([]int, numCols)
if numCols == 1 {
// Do not limit the column length for a single column table.
// This is for the case a single column table can have long rows which cannot
// fit into a single line (one example is the ovsflows outputs).
widths[0] = 0
} else {
// Get the width of every column.
for j := 0; j < numCols; j++ {
width := len(rows[0][j])
for i := 1; i < numRows; i++ {
if len(rows[i][j]) == 0 {
rows[i][j] = "<NONE>"
}
if width < len(rows[i][j]) {
width = len(rows[i][j])
}
}
widths[j] = width
if j != 0 {
widths[j]++
}
}
}
return widths
}

func constructTable(numRows int, numCols int, widths []int, rows [][]string, writer io.Writer) error {
var buffer bytes.Buffer
for i := 0; i < numRows; i++ {
for j := 0; j < numCols; j++ {
val := ""
if j != 0 {
val = " " + val
}
val += rows[i][j]
if widths[j] > 0 {
val += strings.Repeat(" ", widths[j]-len(val))
}
buffer.WriteString(val)
}
buffer.WriteString("\n")
}
if _, err := io.Copy(writer, &buffer); err != nil {
return fmt.Errorf("error when copy output into writer: %w", err)
}

return nil
}

// tableOutputForQueryEndpoint implements printing sub tables (list of tables) for each response, utilizing constructTable
// with multiplicity.
func (cd *commandDefinition) tableOutputForQueryEndpoint(obj interface{}, writer io.Writer) error {
Expand Down Expand Up @@ -580,8 +413,8 @@ func (cd *commandDefinition) tableOutputForQueryEndpoint(obj interface{}, writer
rows := append(header, body...)
sortRows(rows)
numRows, numCol := len(rows), len(rows[0])
widths := getColumnWidths(numRows, numCol, rows)
if err := constructTable(numRows, numCol, widths, rows, writer); err != nil {
widths := output.GetColumnWidths(numRows, numCol, rows)
if err := output.ConstructTable(numRows, numCol, widths, rows, writer); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -655,82 +488,6 @@ func (cd *commandDefinition) tableOutputForQueryEndpoint(obj interface{}, writer
return nil
}

func (cd *commandDefinition) tableOutput(obj interface{}, writer io.Writer) error {
target, err := respTransformer(obj)
if err != nil {
return fmt.Errorf("error when transforming obj: %w", err)
}

list, multiple := target.([]interface{})
var args []string
if multiple {
for _, el := range list {
m := el.(map[string]interface{})
for k := range m {
args = append(args, k)
}
// break after one iteration intentionally (we are just retrieving attribute
// names to use as the table header in the output)
break // nolint:staticcheck
}
} else {
m, _ := target.(map[string]interface{})
for k := range m {
args = append(args, k)
}
}

var buffer bytes.Buffer
for _, arg := range args {
buffer.WriteString(arg)
buffer.WriteString("\t")
}
attrLine := buffer.String()

var valLines []string
if multiple {
for _, el := range list {
m := el.(map[string]interface{})
buffer.Reset()
for _, k := range args {
var output bytes.Buffer
if err = jsonEncode(m[k], &output); err != nil {
return fmt.Errorf("error when encoding data in json: %w", err)
}
buffer.WriteString(strings.Trim(output.String(), "\"\n"))
buffer.WriteString("\t")
}
valLines = append(valLines, buffer.String())
}
} else {
buffer.Reset()
m, _ := target.(map[string]interface{})
for _, k := range args {
var output bytes.Buffer
if err = jsonEncode(m[k], &output); err != nil {
return fmt.Errorf("error when encoding: %w", err)
}
buffer.WriteString(strings.Trim(output.String(), "\"\n"))
buffer.WriteString("\t")
}
valLines = append(valLines, buffer.String())
}

var b bytes.Buffer
w := tabwriter.NewWriter(&b, 15, 0, 1, ' ', 0)
fmt.Fprintln(w, attrLine)
for _, line := range valLines {
fmt.Fprintln(w, line)
}
w.Flush()

if _, err = io.Copy(writer, &b); err != nil {
return fmt.Errorf("error when copy output into writer: %w", err)
}

return nil
}

// output reads bytes from the resp and outputs the data to the writer in desired
// format. If the AddonTransform is set, it will use the function to transform
// the data first. It will try to output the resp in the format ft specified after
Expand Down Expand Up @@ -766,18 +523,18 @@ func (cd *commandDefinition) output(resp io.Reader, writer io.Writer, ft formatt
// Output structure data in format
switch ft {
case jsonFormatter:
return cd.jsonOutput(obj, writer)
return output.JsonOutput(obj, writer)
case yamlFormatter:
return cd.yamlOutput(obj, writer)
return output.YamlOutput(obj, writer)
case tableFormatter:
if cd.commandGroup == get {
return cd.tableOutputForGetCommands(obj, writer)
return output.TableOutputForGetCommands(obj, writer)
} else if cd.commandGroup == query {
if cd.controllerEndpoint.nonResourceEndpoint.path == "/endpoint" {
return cd.tableOutputForQueryEndpoint(obj, writer)
}
} else {
return cd.tableOutput(obj, writer)
return output.TableOutput(obj, writer)
}
default:
return fmt.Errorf("unsupported format type: %v", ft)
Expand Down
4 changes: 2 additions & 2 deletions pkg/antctl/command_definition_test.go
Expand Up @@ -31,6 +31,7 @@ import (

"antrea.io/antrea/pkg/agent/apiserver/handlers/agentinfo"
"antrea.io/antrea/pkg/agent/apiserver/handlers/podinterface"
"antrea.io/antrea/pkg/antctl/output"
"antrea.io/antrea/pkg/antctl/runtime"
"antrea.io/antrea/pkg/antctl/transform/addressgroup"
"antrea.io/antrea/pkg/antctl/transform/appliedtogroup"
Expand Down Expand Up @@ -289,9 +290,8 @@ default nginx-6db489d4b7-vgv7v Interface 127.0.0.1 07-16-76-00-02-86 port
},
} {
t.Run(tc.name, func(t *testing.T) {
opt := &commandDefinition{}
var outputBuf bytes.Buffer
err := opt.tableOutputForGetCommands(tc.rawResponseData, &outputBuf)
err := output.TableOutputForGetCommands(tc.rawResponseData, &outputBuf)
fmt.Println(outputBuf.String())
assert.Nil(t, err)
assert.Equal(t, tc.expected, outputBuf.String())
Expand Down
3 changes: 2 additions & 1 deletion pkg/antctl/command_list.go
Expand Up @@ -63,7 +63,8 @@ func (cl *commandList) applyToRootCommand(root *cobra.Command, client AntctlClie

for _, cmd := range cl.rawCommands {
if (runtime.Mode == runtime.ModeAgent && cmd.supportAgent) ||
(runtime.Mode == runtime.ModeController && cmd.supportController) {
(runtime.Mode == runtime.ModeController && cmd.supportController) ||
(!runtime.InPod && cmd.commandGroup == mc) {
if groupCommand, ok := groupCommands[cmd.commandGroup]; ok {
groupCommand.AddCommand(cmd.cobraCommand)
} else {
Expand Down

0 comments on commit 9879f4c

Please sign in to comment.