Skip to content

Commit

Permalink
more package refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
birdayz committed Apr 2, 2020
1 parent 0b1ae1b commit 3d024e6
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 34 deletions.
36 changes: 18 additions & 18 deletions cmd/kaf/config.go
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"os"

"github.com/birdayz/kaf"
"github.com/birdayz/kaf/pkg/config"
"github.com/manifoldco/promptui"
"github.com/spf13/cobra"
)
Expand All @@ -31,7 +31,7 @@ var configUseCmd = &cobra.Command{
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
name := args[0]
if err := config.SetCurrentCluster(name); err != nil {
if err := cfg.SetCurrentCluster(name); err != nil {
fmt.Printf("Cluster with name %v not found\n", name)
} else {
fmt.Printf("Switched to cluster \"%v\".\n", name)
Expand All @@ -47,7 +47,7 @@ var configLsCmd = &cobra.Command{
if !noHeaderFlag {
fmt.Println("NAME")
}
for _, cluster := range config.Clusters {
for _, cluster := range cfg.Clusters {
fmt.Println(cluster.Name)
}
},
Expand All @@ -58,7 +58,7 @@ var configSelectCluster = &cobra.Command{
Short: "Interactively select a cluster",
Run: func(cmd *cobra.Command, args []string) {
var clusterNames []string
for _, cluster := range config.Clusters {
for _, cluster := range cfg.Clusters {
clusterNames = append(clusterNames, cluster.Name)
}
p := promptui.Select{
Expand All @@ -74,7 +74,7 @@ var configSelectCluster = &cobra.Command{
// How to have selection on currently selected cluster?

// TODO copy pasta
if err := config.SetCurrentCluster(selected); err != nil {
if err := cfg.SetCurrentCluster(selected); err != nil {
fmt.Printf("Cluster with selected %v not found\n", selected)
}
},
Expand All @@ -86,18 +86,18 @@ var configAddClusterCmd = &cobra.Command{
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
name := args[0]
for _, cluster := range config.Clusters {
for _, cluster := range cfg.Clusters {
if cluster.Name == name {
errorExit("Could not add cluster: cluster with name '%v' exists already.", name)
}
}

config.Clusters = append(config.Clusters, &kaf.Cluster{
cfg.Clusters = append(cfg.Clusters, &config.Cluster{
Name: name,
Brokers: brokersFlag,
SchemaRegistryURL: schemaRegistryURL,
})
err := config.Write()
err := cfg.Write()
if err != nil {
errorExit("Unable to write config: %v\n", err)
}
Expand All @@ -109,14 +109,14 @@ var configImportCmd = &cobra.Command{
Use: "import [ccloud]",
Short: "Import configurations into the $HOME/.kaf/config file",
Run: func(cmd *cobra.Command, args []string) {
if path, err := kaf.TryFindCcloudConfigFile(); err == nil {
if path, err := config.TryFindCcloudConfigFile(); err == nil {
fmt.Printf("Detected Confluent Cloud config in file %v\n", path)
if username, password, broker, err := kaf.ParseConfluentCloudConfig(path); err == nil {
if username, password, broker, err := config.ParseConfluentCloudConfig(path); err == nil {

newCluster := &kaf.Cluster{
newCluster := &config.Cluster{
Name: "ccloud",
Brokers: []string{broker},
SASL: &kaf.SASL{
SASL: &config.SASL{
Username: username,
Password: password,
Mechanism: "PLAIN",
Expand All @@ -125,23 +125,23 @@ var configImportCmd = &cobra.Command{
}

var found bool
for i, newCluster := range config.Clusters {
for i, newCluster := range cfg.Clusters {
if newCluster.Name == "confluent cloud" {
found = true
config.Clusters[i] = newCluster
cfg.Clusters[i] = newCluster
break
}
}

if !found {
fmt.Println("Wrote new entry to config file")
config.Clusters = append(config.Clusters, newCluster)
cfg.Clusters = append(cfg.Clusters, newCluster)
}

if config.CurrentCluster == "" {
config.CurrentCluster = newCluster.Name
if cfg.CurrentCluster == "" {
cfg.CurrentCluster = newCluster.Name
}
config.Write()
cfg.Write()

}
}
Expand Down
9 changes: 4 additions & 5 deletions cmd/kaf/group.go
Expand Up @@ -17,13 +17,12 @@ import (
"sync"

"github.com/Shopify/sarama"
"github.com/birdayz/kaf/pkg/streams"
"github.com/spf13/cobra"

"strconv"

"time"

"github.com/birdayz/kaf"
)

func init() {
Expand Down Expand Up @@ -316,7 +315,7 @@ var groupDescribeCmd = &cobra.Command{
}
} else {
switch d := decodedUserData.(type) {
case kaf.SubscriptionInfo:
case streams.SubscriptionInfo:
fmt.Fprintf(w, "\f\t\tMetadata:\t\n")
fmt.Fprintf(w, "\t\t UUID:\t0x%v\n", hex.EncodeToString(d.UUID))
fmt.Fprintf(w, "\t\t UserEndpoint:\t%v\n", d.UserEndpoint)
Expand Down Expand Up @@ -397,11 +396,11 @@ func IsASCIIPrintable(s string) bool {

func tryDecodeUserData(protocol string, raw []byte) (data interface{}, err error) {
// Interpret userdata here
decoder := kaf.NewDecoder(raw)
decoder := streams.NewDecoder(raw)

switch protocol {
case "stream":
subscriptionInfo := kaf.SubscriptionInfo{}
subscriptionInfo := streams.SubscriptionInfo{}
err = subscriptionInfo.Decode(decoder)
if err != nil {
return nil, err
Expand Down
14 changes: 7 additions & 7 deletions cmd/kaf/kaf.go
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/Shopify/sarama"
"github.com/spf13/cobra"

"github.com/birdayz/kaf"
"github.com/birdayz/kaf/pkg/avro"
"github.com/birdayz/kaf/pkg/config"
"github.com/birdayz/kaf/pkg/proto"
)

Expand Down Expand Up @@ -118,8 +118,8 @@ func main() {
}
}

var config kaf.Config
var currentCluster *kaf.Cluster
var cfg config.Config
var currentCluster *config.Cluster

var (
brokersFlag []string
Expand Down Expand Up @@ -151,20 +151,20 @@ var setupProtoDescriptorRegistry = func(cmd *cobra.Command, args []string) {

func onInit() {
var err error
config, err = kaf.ReadConfig(cfgFile)
cfg, err = config.ReadConfig(cfgFile)
if err != nil {
errorExit("Invalid config: %v", err)
}

config.ClusterOverride = clusterOverride
cfg.ClusterOverride = clusterOverride

cluster := config.ActiveCluster()
cluster := cfg.ActiveCluster()
if cluster != nil {
// Use active cluster from config
currentCluster = cluster
} else {
// Create sane default if not configured
currentCluster = &kaf.Cluster{
currentCluster = &config.Cluster{
Brokers: []string{"localhost:9092"},
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config.go
@@ -1,4 +1,4 @@
package kaf
package config

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/confluent_cloud.go
@@ -1,4 +1,4 @@
package kaf
package config

import (
"errors"
Expand Down
2 changes: 1 addition & 1 deletion pkg/streams/decoder.go
@@ -1,4 +1,4 @@
package kaf
package streams

import (
"encoding/binary"
Expand Down
2 changes: 1 addition & 1 deletion pkg/streams/subscription_info.go
@@ -1,4 +1,4 @@
package kaf
package streams

type SubscriptionInfo struct {
Version int32
Expand Down

0 comments on commit 3d024e6

Please sign in to comment.