/
consume.go
137 lines (113 loc) · 3.48 KB
/
consume.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
Copyright © 2023 VECI Group Tech S.L.
This file is part of kafka-client.
*/
package cmd
import (
"fmt"
"strings"
"time"
"github.com/bluekiri/kafka-client/internal/handlers"
"github.com/bluekiri/kafka-client/internal/ioutils"
"github.com/bluekiri/kafka-client/internal/sliceutils"
"github.com/Shopify/sarama"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)
const (
consumeExample = "kafka-client consume localhost:9092 my_topic"
consumeShort = "Consumes messages from a Kafka topic."
consumeLong = `consume command uses bootstrap_servers to get the brokers of the Kafka cluster
and consume messages from the indicated topic printing them to stdout unless a
filename is provided by the --output flag.`
)
// consumeCmd represents the consume command
var consumeCmd = &cobra.Command{
Use: "consume bootstrap_servers topic",
Short: consumeShort,
Long: consumeLong,
Example: consumeExample,
Args: cobra.ExactArgs(2),
ValidArgsFunction: completeClustersAndTopic(2),
RunE: consume,
}
func init() {
rootCmd.AddCommand(consumeCmd)
consumeCmd.Flags().StringP(output, "o", "", "write to file instead of stdout.")
consumeCmd.MarkFlagFilename(output)
addFormatFlags(consumeCmd)
}
func consume(cmd *cobra.Command, args []string) error {
cmd.SilenceUsage = true
// Get the command arguments and flags
kafkaBrokers := strings.Split(resolveCluster(args[0]), ",")
kafkaTopic := args[1]
kafkaClientID := viper.GetString(clientID)
outputFilename, _ := cmd.Flags().GetString(output)
duration := viper.GetDuration(duration)
reportingPeriod := time.Duration(1) * time.Second
if viper.GetBool(quiet) {
reportingPeriod = -1
}
// Get the formatter
formatter, err := getFormatter(cmd, outputFilename)
if err != nil {
return err
}
// Get the writer (sink of messages)
writer, err := ioutils.Create(outputFilename)
if err != nil {
return err
}
defer writer.Close()
// Kafka configuration
config := sarama.NewConfig()
config.ClientID = kafkaClientID
config.Consumer.Return.Errors = true
// Get the Kafka client
client, err := sarama.NewClient(kafkaBrokers, config)
if err != nil {
return err
}
defer client.Close()
// Check topic exists
if topics, err := client.Topics(); err != nil {
return err
} else if !sliceutils.Contains(topics, kafkaTopic) {
return fmt.Errorf("kafka: topic %s does not exist", kafkaTopic)
}
// Create the handlers
inputHandler, err := handlers.NewKafkaInputHandler(client, kafkaTopic)
if err != nil {
return nil
}
outputHandler, err := handlers.NewFileOutputHandler(inputHandler.Messages(), formatter.NewWriter(writer))
if err != nil {
return nil
}
reportingHandler := handlers.NewReportingHandler(logger, reportingPeriod)
// Get the interruptable context
ctx := interruptableContext(cmd.Context(), duration)
// Create the error group
g, ctx := errgroup.WithContext(ctx)
// Start reporting goroutine
g.Go(reportingHandler.Start(inputHandler.Progress(), outputHandler.Progress()))
// Start the output goroutine
g.Go(outputHandler.Run)
// Start the input goroutine
g.Go(inputHandler.Start(ctx))
// Log start
logOutput := ""
if outputFilename != "" {
logOutput = fmt.Sprintf(" to '%s'", outputFilename)
}
logger.Printf(
"consuming messages from cluster %s topic '%s'%s",
strings.Join(kafkaBrokers, ","), kafkaTopic,
logOutput,
)
logger.Printf("press ctrl-c to exit")
// Return the error group error
return adaptError(g.Wait())
}