/
root.go
284 lines (244 loc) · 9.73 KB
/
root.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package cmd
import (
"errors"
"regexp"
"s3-kinesis-replay/json"
"s3-kinesis-replay/kinesis"
"s3-kinesis-replay/replay"
"s3-kinesis-replay/s3"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
Kinesis "github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
S3 "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// define application entrypoint
var rootCmd = &cobra.Command{
Use: "s3-kinesis-replay",
Short: "a cli for replaying historical kinesis messages via an s3 archive",
Args: validateConfig,
Run: func(cmd *cobra.Command, args []string) {
// create application logger
log := createLogger()
// create aws session
sess := session.Must(session.NewSession())
// create s3 client and downloader
var archive replay.Archive
s3client := createS3Client(sess)
s3downloader := createDownloader(s3client)
archive = createArchive(log, s3client, s3downloader)
// create kinesis client
var producer replay.Producer
kinesisClient := createKinesisClient(sess)
producer = createProducer(log, kinesisClient)
// create parser
var parser replay.Parser
format := viper.GetString("parser.format")
if format == "json" {
parser = createJSONParser(log)
} else {
log.Fatalln("invalid format")
}
// bootstrap application
parser.Parse(archive.Scan(), producer.Stream())
producer.Wait()
log.Infoln("replay completed")
},
}
// createArchive creates a new archive value
func createArchive(log logrus.FieldLogger, client s3iface.S3API, downloader s3.Downloader) replay.Archive {
// create s3 archive
archiveConfig := s3.NewArchiveConfig()
archiveConfig.Bucket = viper.GetString("s3.bucket")
archiveConfig.Concurrency = viper.GetInt("s3.concurrency")
archiveConfig.Client = client
archiveConfig.Downloader = downloader
archiveConfig.Log = log.WithField("package", "s3")
if viper.IsSet("s3.prefix") {
archiveConfig.Prefix = viper.GetString("s3.prefix")
}
if viper.IsSet("s3.start_after") {
archiveConfig.StartAfter = viper.GetString("s3.start_after")
}
if viper.IsSet("s3.stop_at") {
archiveConfig.StopAt = viper.GetString("s3.stop_at")
}
archive, err := s3.NewArchive(archiveConfig)
if err != nil {
log.WithError(err).Fatalln("error creating archive service")
}
return archive
}
// createDownloader returns a new s3 downloader
func createDownloader(client s3iface.S3API) s3.Downloader {
return s3manager.NewDownloaderWithClient(client)
}
// createJSONParser returns a new json parser
func createJSONParser(log logrus.FieldLogger) replay.Parser {
config := json.NewParserConfig()
config.Log = log.WithField("package", "json")
config.PartitionKey = viper.GetString("json.partition_key")
config.Schema = viper.GetString("json.schema")
if viper.IsSet("json.concurrency") {
config.Concurrency = viper.GetInt("json.concurrency")
}
if delimiter := viper.GetString("parser.delimiter"); delimiter != "" {
config.Delimiter = regexp.MustCompile(viper.GetString("parser.delimiter"))
}
if replace := viper.GetString("parser.replace"); replace != "" {
replaceWith := viper.GetString("parser.replace_with")
if replaceWith == "" {
log.WithError(errors.New("if replace is defined, replace_with must also be defined")).
Errorln("error creating json parser")
}
config.Replace = regexp.MustCompile(viper.GetString("parser.replace"))
config.ReplaceWith = replaceWith
}
parser, err := json.NewParser(config)
if err != nil {
log.WithError(err).Fatalln("error creating json parser")
}
return parser
}
// createKinesisClient creates a new kinesis client
func createKinesisClient(sess *session.Session) kinesisiface.KinesisAPI {
config := aws.NewConfig()
if endpoint := viper.GetString("kinesis.endpoint"); endpoint != "" {
config.Endpoint = aws.String(endpoint)
}
if region := viper.GetString("kinesis.region"); region != "" {
config.Region = aws.String(region)
}
client := Kinesis.New(sess, config)
return client
}
// createLogger configures and returns a new application scoped logger
func createLogger() logrus.FieldLogger {
// set logging verbosity
levels := map[string]logrus.Level{
"debug": logrus.DebugLevel,
"info": logrus.InfoLevel,
"warn": logrus.WarnLevel,
"error": logrus.ErrorLevel,
"fatal": logrus.FatalLevel,
}
if level, ok := levels[viper.GetString("log.level")]; !ok {
panic("invalid configuration: log.level")
} else {
logrus.SetLevel(level)
}
// set formatter
format := viper.GetString("log.level")
if format == "json" {
logrus.SetFormatter(&logrus.JSONFormatter{})
} else {
logrus.SetFormatter(&logrus.TextFormatter{})
}
// create scoped logger
logger := logrus.WithField("name", "s3-kinesis-replay")
return logger
}
// createProducer creates a new producer value
func createProducer(log logrus.FieldLogger, client kinesisiface.KinesisAPI) replay.Producer {
config := kinesis.NewProducerConfig()
config.Client = client
config.Log = log.WithField("package", "kinesis")
config.StreamName = viper.GetString("kinesis.stream_name")
if backoffInterval := viper.GetDuration("kinesis.backoff_interval"); backoffInterval != time.Duration(0) {
config.BackoffInterval = backoffInterval
}
if backoffMaxInterval := viper.GetDuration("kinesis.backoff_max_interval"); backoffMaxInterval != time.Duration(0) {
config.BackoffMaxInterval = backoffMaxInterval
}
if bufferWindow := viper.GetDuration("kinesis.buffer_window"); bufferWindow != time.Duration(0) {
config.BufferWindow = bufferWindow
}
producer, err := kinesis.NewProducer(config)
if err != nil {
log.WithError(err).Fatalln("error creating producer")
}
return producer
}
// createS3Client creates a new s3 client using the given session
func createS3Client(sess *session.Session) s3iface.S3API {
config := aws.NewConfig()
if endpoint := viper.GetString("s3.endpoint"); endpoint != "" {
config.Endpoint = aws.String(endpoint)
}
if region := viper.GetString("s3.region"); region != "" {
config.Region = aws.String(region)
}
client := S3.New(sess, config)
return client
}
// Execute the root command
func Execute() {
rootCmd.Execute()
}
// bind cli flags to application configuration
func init() {
rootCmd.Flags().Int("json-concurrency", 4, "json parser concurrency")
viper.BindPFlag("json.concurrency", rootCmd.Flags().Lookup("json-concurrency"))
rootCmd.Flags().String("partition-key", "", "json parser parition key path")
viper.BindPFlag("json.partition_key", rootCmd.Flags().Lookup("partition-key"))
rootCmd.Flags().String("json-schema", "", "json parser schema path")
viper.BindPFlag("json.schema", rootCmd.Flags().Lookup("json-schema"))
rootCmd.Flags().String("kinesis-backoff-interval", "", "kinesis backoff interval")
viper.BindPFlag("kinesis.backoff_interval", rootCmd.Flags().Lookup("kinesis-backoff-interval"))
rootCmd.Flags().String("kinesis-backoff-max-interval", "", "kinesis max backoff interval")
viper.BindPFlag("kinesis.backoff_max_interval", rootCmd.Flags().Lookup("kinesis-backoff-max-interval"))
rootCmd.Flags().String("kinesis-buffer-window", "", "kinesis buffer window size")
viper.BindPFlag("kinesis.buffer_window", rootCmd.Flags().Lookup("kinesis-buffer-window"))
rootCmd.Flags().String("kinesis-endpoint", "", "kinesis endpoint override")
viper.BindPFlag("kinesis.endpoint", rootCmd.Flags().Lookup("kinesis-endpoint"))
rootCmd.Flags().String("kinesis-region", "", "kinesis region override")
viper.BindPFlag("kinesis.region", rootCmd.Flags().Lookup("kinesis-region"))
rootCmd.Flags().String("stream-name", "", "target kinesis stream name")
viper.BindPFlag("kinesis.stream_name", rootCmd.Flags().Lookup("stream-name"))
rootCmd.Flags().String("log-level", "", "log verbosity level")
viper.BindPFlag("log.level", rootCmd.Flags().Lookup("log-level"))
rootCmd.Flags().String("format", "", "parser format")
viper.BindPFlag("parser.format", rootCmd.Flags().Lookup("format"))
rootCmd.Flags().String("delimiter", "", "optional delimiter regexp")
viper.BindPFlag("parser.delimiter", rootCmd.Flags().Lookup("delimiter"))
rootCmd.Flags().String("replace", "", "optional replace regexp")
viper.BindPFlag("parser.replace", rootCmd.Flags().Lookup("replace"))
rootCmd.Flags().String("replace-with", "", "optional replacement string")
viper.BindPFlag("parser.replace_with", rootCmd.Flags().Lookup("replace-with"))
rootCmd.Flags().String("bucket", "", "s3 archive bucket name")
viper.BindPFlag("s3.bucket", rootCmd.Flags().Lookup("bucket"))
rootCmd.Flags().Int("s3-concurrency", 4, "s3 download concurrency")
viper.BindPFlag("s3.concurrency", rootCmd.Flags().Lookup("s3-concurrency"))
rootCmd.Flags().String("prefix", "", "s3 archive prefix")
viper.BindPFlag("s3.prefix", rootCmd.Flags().Lookup("prefix"))
rootCmd.Flags().String("s3-region", "", "s3 archive region")
viper.BindPFlag("s3.region", rootCmd.Flags().Lookup("s3-region"))
rootCmd.Flags().String("start-after", "", "s3 archive start-after key")
viper.BindPFlag("s3.start_after", rootCmd.Flags().Lookup("start-after"))
rootCmd.Flags().String("stop-at", "", "s3 archive stop-at key")
viper.BindPFlag("s3.stop_at", rootCmd.Flags().Lookup("stop-at"))
}
// validateConfig handles validating runtime configuration
func validateConfig(cmd *cobra.Command, args []string) error {
validFormats := regexp.MustCompile("^(json)$")
// validate parser format
parserFormat := viper.GetString("parser.format")
if !validFormats.MatchString(parserFormat) {
return errors.New("invalid parser format")
}
// validate kinesis configuration
if !viper.IsSet("kinesis.stream_name") {
return errors.New("kinesis stream name is required")
}
// validate s3 configuration
if !viper.IsSet("s3.bucket") {
return errors.New("s3 bucket is required")
}
return nil
}